from modules.user.info import table, auth from modules.user.info import user_id as info_user_id from modules.user.info import auth as info_auth from modules.user.info import friend as info_friends from modules.user.info import team as info_team from modules.algorithms.uuid import generate as uuid_generate from modules.algorithms.univ import dict_key_verify from modules.data.config import read as config_read from modules.data.database import connect as db_connect from modules.data.datetime import timestamp from modules.track.logging import status from PIL import Image import io class user_content(table): def __init__(self, user_id=None, username=None, occupation_id=None, team_id=None, comment_id=None, post_id=None, content=None, caption=None, *args, **kwargs): if not self.allowed_columns: self.allowed_columns = ['post_id', 'content', 'caption', 'username', 'team_id', 'date'] super().__init__(user_id=user_id, username=username, occupation_id=occupation_id, team_id=team_id, post_id=post_id, content=content) self.post_id = post_id self.comment_id = comment_id self.content = content self.caption = caption @property def id(self): return self._id @id.setter def id(self, value): if type(value) == str: self.cur.execute("SELECT username FROM auth_credentials WHERE user_id = ?", (value,)) if self.cur.fetchone(): self.cur.execute("SELECT post_id FROM posts WHERE user_id=? AND date=?", (value, self.date)) rez = self.cur.fetchone() if rez: self.post_id = rez[0] else: value = None else: value = None self._id = value @property def post_id(self): return self._post_id @post_id.setter def post_id(self, value): self.cur.execute("SELECT content FROM posts WHERE post_id = ?", (value,)) if not self.cur.fetchone(): value = None self._post_id = value @property def comment_id(self): return self._comment_id @comment_id.setter def comment_id(self, value): self.cur.execute("SELECT content FROM comments WHERE comment_id=?", (value,)) if not self.cur.fetchone(): value = None self._comment_id = value @property def occupation_id(self): return self._occupation_id @occupation_id.setter def occupation_id(self, value): team_value = None self.cur.execute("SELECT name FROM occupations WHERE occupation_id = ?", (value,)) if not self.cur.fetchone(): value = None else: self.cur.execute("SELECT team_id FROM teams WHERE occupation_id = ?", (value,)) rez = self.cur.fetchone() if rez: team_value = rez[0] self.team_id = team_value self._occupation_id = value @property def content(self): return self._content @content.setter def content(self, value): if type(value) != str: value = None self._content = value class notification(table): @property def notification_id(self): return self._notification_id @notification_id.setter def notification_id(self, value): self.cur.execute("SELECT notification_id FROM notifications WHERE notification_id=?", (value,)) if not self.cur.fetchone(): value = None self._notification_id = value @property def target_id(self): return self._target_id @target_id.setter def target_id(self, value): self.cur.execute("SELECT user_id FROM auth_credentials WHERE user_id=?", (value,)) user = self.cur.fetchone() self.cur.execute("SELECT team_id FROM teams WHERE team_id=?", (value,)) team = self.cur.fetchone() if value == "all-" + self.server_code: all_server = True else: all_server = False level = value in ['member', 'management', 'admin'] if not (user or team or all_server or level): value = None self._target_id = value @property def title(self): return self._title @title.setter def title(self, value): if type(value) != str: value = None self._title = value @property def content(self): return self._content @content.setter def content(self, value): if type(value) != str: value = None self._content = value @property def expire_after(self): return self._expire_after @expire_after.setter def expire_after(self, value): if type(value) != float and type(value) != int: value = None self._expire_after = value def __init__(self, user_id=None, username=None, notification_id=None, target_id=None, title=None, content=None, expire_after=None): self.allowed_columns = ['notification_id', 'target_id', 'title', 'content', 'time_created'] super().__init__(user_id=user_id, username=username) self.notification_id = notification_id self.target_id = target_id self.title = title self.content = content self.expire_after = expire_after def get_target_group(self, data=None): # finds out the type of id thats been provided # below the list displays the types that can be provided info = {'type': None, 'id': None} types = ['server', 'user', 'team', 'level'] # "server" means that everyone is the target for the notificaion if dict_key_verify(data, 'target_id'): self.target_id = data['target_id'] if self.target_id: if self.target_id == "all-"+self.server_code: # "all-" is the unique way of identifying a notification to the entire server # its structured this way to stop any collisions with a user who might call themselves "all" or "server" # as such the server code (usually a string of numbers or 12345 by default) is banned from use in usernames info = {'type': types[0], 'id': self.target_id} elif self.target_id in ['member', 'management', 'admin']: info = {'type': types[3], 'id': self.target_id} else: self.cur.execute("SELECT username FROM auth_credentials WHERE user_id = ?", (self.target_id,)) rez = self.cur.fetchone() if rez: username = auth(user_id=self.target_id).get()['username'] info = {'type': types[1], 'id': username} self.cur.execute("SELECT name FROM teams WHERE team_id = ?", (self.target_id,)) rez = self.cur.fetchone() if rez: info = {'type': types[2], 'id': self.target_id} else: info = None return info def get_targets(self, data=None): # targets are the users that hte notifications should be sent to # targets can be specified on creation as a number of diffrent things, for instance providing a team id allows the targeting of a team # its implicit meaning its the servers job to find out what type of ID the user is providing info = {'targets': None} if dict_key_verify(data, 'target_id'): self.target_id = target_id if self.target_id: target_group = self.get_target_group(self) if target_group['type'] == "user": info['targets'] = [{'user_id':info_user_id(username=target_group['id']).get()['user_id']}] else: if target_group['type'] == "server": self.cur.execute("SELECT user_id FROM profile") elif target_group['type'] == "team": self.cur.execute("SELECT user_id FROM profile INNER JOIN teams USING(occupation_id) WHERE team_id=?", (target_group['id'],)) elif target_group['type'] == "level": self.cur.execute("SELECT user_id FROM auth_credentials WHERE level=?", (target_group['id'],)) rez = self.cur.fetchall() if rez: info['targets'] = [{'user_id':target[0]} for target in rez] else: # status message info = None return info def get_unsent(self, data=None): info = {'notifications': None} #self.cur.execute("SELECT notification_id FROM notifications_sent WHERE user_id=? AND sent=?",(self.id, False)) self.cur.execute("SELECT notification_id, time_created FROM notifications INNER JOIN notifications_sent USING(notification_id) WHERE user_id=? AND sent=?", (self.id, False)) rez = self.cur.fetchall() if rez: info['notifications'] = [] queued_notifs = self._sort_notifications(rez) for unsent in queued_notifs: notification_info = notification(notification_id=unsent) notification_info.columns = self.columns notif_data = notification_info.get_notification()['notifications'][0] info['notifications'].append(notif_data) if not self.id: info = None return info def _sort_notifications(self, notifs): for i in range(len(notifs)): if i < len(notifs)-2: if notifs[i][1] < notifs[i+1][1]: swap = notifs[i+1] notifs[i+1] = notifs[i] notifs[i] = swap notifs = [notif[0] for notif in notifs] return notifs def get(self, data=None): info = None if dict_key_verify(data, 'user_id'): self.id = data['user_id'] if dict_key_verify(data, 'target_id'): self.target_id = data['target_id'] if dict_key_verify(data, 'notification_id'): self.notification_id = data['notification_id'] if self.notification_id: info = self.get_notification() elif self.id: info = self.get_user() else: status("WARN", "Unable to fetch notifications, must provide a valid user or notification ID", self.statface) return info def get_user(self): info = {'notifications':None} self.cur.execute("SELECT notification_id FROM notifications_sent WHERE user_id = ?", (self.id,)) rez = self.cur.fetchall() if rez: info = {'notifications': []} # rez could be user_notifs for notif in rez: notif_id = notif[0] user_notification = notification(notification_id=notif_id) user_notification.columns = self.columns notification_info = user_notification.get_notification()['notifications'][0] info['notifications'].append(notification_info) else: status("WARN", "No notifications exist for this user", self.statface) else: status("WARN", "No notifications exist for this user", self.statface) if not self.id: status("WARN", "Unable to fetch notifications, invalid user provided", self.statface) info = None return info def get_group(self, data=None): info = {'notifications': None} if self.target_id: self.cur.execute("SELECT notification_id FROM notifications WHERE target_id=?", (self.target_id,)) rez = self.cur.fetchall() if rez: info['notifications'] = [{column: None for column in self.columns} for notif in rez] for i, notif in enumerate(rez): notification_info = notification(notification_id=notif[0]) notification_info.columns = self.columns notif_data = notification_info.get_notification()['notifications'][0] info['notifications'][i] = notif_data else: # status message info = None return None def get_notification(self, data=None): info = {'notifications': None} info['notifications'] = [{column: None for column in self.columns}] if self.notification_id: for column in self.columns: self.cur.execute(f"SELECT {column} FROM notifications WHERE notification_id = ?", (self.notification_id,)) rez = self.cur.fetchone() if rez: info_item = rez[0] if column == "target_id": info_item = self.get_target_group({'target_id': info_item})['id'] info['notifications'][0][column] = rez[0] else: status("WARN", "Notification {column} unable to be fetched, something went wrong", self.statface) else: status("WARN", "No data requested to be fetched, check inputs", self.statface) else: info = None status("WARN", "Notification unable to be fetched, invalid notification ID provided", self.statface) return info def load_notification(self, data=None): # loads notifications into the "notification_sent" table. This is where notifications are queued for sending when their target next logs in if dict_key_verify(data, 'notification_id'): self.notification_id = data['notification_id'] if self.notification_id: notification_info = notification(notification_id=self.notification_id) notification_data = notification_info.get_notification()['notifications'][0] notification_info.target_id = self.target_id target_data = notification_info.get_targets()['targets'] if target_data: for target in target_data: self.cur.execute("INSERT INTO notifications_sent (notification_id, user_id) VALUES (?, ?)", (self.notification_id, target['user_id'])) self.db.commit() def create(self, data=None): if dict_key_verify(data, 'target_id'): self.target_id = data['target_id'] if dict_key_verify(data, 'title'): self.title = data['title'] if dict_key_verify(data, 'content'): self.content = data['content'] if dict_key_verify(data, 'expire_after'): self.expire_after = data['expire_after'] notification_id = uuid_generate() time_created = timestamp().now if self.title and self.target_id: if not self.content: self.content = self.title status("WARN", "No notification content provided, setting content to title", self.statface) if not self.expire_after: self.expire_after = float(config_read(section="notifications", key="defaultexpiretime")) status("WARN", "No notification expire after time provided, setting to default", self.statface) self.cur.execute("INSERT INTO notifications (notification_id, target_id, title, content, time_created, expire_after) VALUES (?, ?, ?, ?, ?, ?)", (notification_id, self.target_id, self.title, self.content, time_created, self.expire_after)) self.db.commit() status("INFO", "Notification successfully created", self.statface) pre_load_notification_id = self.notification_id self.load_notification({'notification_id': notification_id}) self.notification_id = pre_load_notification_id else: status("FAIL", "Unable to create notification, invalid title or target ID provided", self.statface) def delete(self, data=None): if dict_key_verify(data, 'target_id'): self.target_id = data['target_id'] if dict_key_verify(data, 'notification_id'): self.notification_id = data['notification_id'] if self.notification_id: self.delete_notification(data) elif self.target_id: self.delete_group(data) elif self.id: self.delete_user(data) else: status("FAIL", "Unable to delete notification, invalid user or target/notification ID provided", self.statface) def delete_user(self, data=None): if self.id: self.cur.execute("SELECT notification_id FROM notifications_sent WHERE user_id=?", (self.id,)) rez = self.cur.fetchall() if rez: for notif in rez: notification_info = notification() notification_info.delete_notification({'notification_id':notif[0]}) else: status("WARN", "User has no notifications to be deleted", self.statface) else: status("FAIL", "Unable to delete notification(s), something went wrong", self.statface) else: status("FAIL", "Unable to delete notification(s), invalid user provided", self.statface) def delete_group(self, data=None): if self.target_id: self.cur.execute("SELECT notification_id FROM notifications WHERE target_id=?", (self.target_id,)) rez = self.cur.fetchall() if rez: for notif in rez: notification_info = notification(notification_id=notif[0]) notification_info.delete_notification() else: status("WARN", "Target(s) have no notifications to be deleted", self.statface) else: status("FAIL", "Notification(s) unable to be deleted somethign went wrong", self.statface) else: status("FAIL", "Notification(s) unable to be deleted, invalid target ID provided", self.statface) def delete_notification(self, data=None): if self.notification_id: self.cur.execute("DELETE FROM notifications_sent WHERE notification_id=?", (self.notification_id,)) self.db.commit() status("INFO", "Succesfully deleted notification", self.statface) else: status("FAIL", "Unable to delete notification, invali notification ID provided", self.statface) def remove(self, data=None): if self.notification_id: self.cur.execute("DELETE FROM notifications_sent WHERE user_id = ? AND notification_id=?", (self.id, self.notification_id,)) self.db.commit() status("INFO", "Notification successfully removed", self.statface) else: status("FAIL", "Notification unable to be removed, invalid notification ID provided", self.statface) class post(user_content): @property def caption(self): return self._caption @caption.setter def caption(self, value): if type(value) != str: value = None self._caption = value @property def content(self): return self._content @content.setter def content(self, value): image_formats = ['png', 'jpg'] for form in image_formats: try: save_path = f"data/images/post_{self.id}_{self.date}.{form}" with Image.open(io.BytesIO(value)) as recieved: recieved.save(save_path) break except: save_path = None self._content = save_path def __init__(self, user_id=None, username=None, occupation_id=None, team_id=None, post_id=None, content=None, caption=None, *args, **kwargs): self.allowed_columns = ['post_id', 'content', 'caption', 'username', 'date'] super().__init__(user_id=user_id, username=username, occupation_id=occupation_id, team_id=team_id, post_id=post_id, content=content, caption=caption) def get_feed(self): info = {"posts": None} post_feeds = [] friend_posts_info = self.get_friends() if dict_key_verify(friend_posts_info, "posts"): friend_posts = friend_posts_info['posts'] post_feeds.append(friend_posts) status("INFO", "Succesfully fetched friends' post(s)", self.statface) team_posts_info = self.get_team() if dict_key_verify(team_posts_info, "posts"): team_posts = team_posts_info['posts'] post_feeds.append(team_posts) status("INFO", "Succesfully fetched team's post(s)", self.statface) for post_feed in post_feeds: info = {"posts": []} for post in post_feed: info['posts'].append(post) return info def get(self): info = {"posts":{column: None for column in self.columns}} for column in self.columns: # gots per column for the sql querey instead of quereying every field # this is so we can add only what the user requests as their is no easy way of identifying onces fetched from a result # without hte user of "magic numbers" anyway if column == "username": column = "user_id" self.cur.execute(f"SELECT {column} FROM posts WHERE post_id=?", (self.post_id,)) rez = self.cur.fetchone() append_item = rez[0] if rez: if column == "user_id": column = "username" append_item = auth(user_id=append_item).get()['username'] elif column == "content": with open(append_item, "rb") as content: append_item = content.read() info["posts"][column] = append_item status("INFO", "Succesfully fetched {column} for requested post", self.statface) else: status("FAIL", "Could not fetch post data, invalid data provided", self.statface) else: status("WARN", "No post data requested to be fetched, check your inputs", self.statface) return info def get_memories(self): info = {"posts":None} self.cur.execute(f"SELECT post_id FROM posts WHERE user_id=?", (self.id,)) rez = self.cur.fetchall() if rez: info = {"posts":[{column: None for column in self.columns} for post in rez]} for i, post_details in enumerate(rez): post_info = post(post_id=post_details[0]) post_info.columns = self.columns info['posts'][i] = post_info.get()['posts'] status("INFO", "Succesfully fetched post memories", self.statface) elif self.id: status("WARN", "No memories exist", self.statface) if not self.id: status("FAIL", "Could not fetch memory data, invalid data provided or none exist", self.statface) return info def get_user(self): info = {"posts":{column: None for column in self.columns}} self.cur.execute(f"SELECT post_id FROM posts WHERE user_id=? AND date=?", (self.id, self.date)) rez = self.cur.fetchone() if rez: post_info = post(post_id=rez[0]) post_info.columns = self.columns info = post_info.get() status("INFO", "Post(s) Succesfully fetched", self.statface) else: info["posts"] = None status("WARN", "No post(s) exist for that user", self.statface) if not self.id or not self.date: info = None status("FAIL", "No posts fetched, invalid inputs", self.statface) return info def get_friends(self): info = {'posts':None} friends = info_friends(user_id=self.id).get() if dict_key_verify(friends, 'friends'): friends = friends['friends'] info = {"posts":[{column: None for column in self.columns} for friend in friends]} for i, friend in enumerate(friends): friend_info = post(username=friend['username']) friend_info.columns = self.columns data = friend_info.get_user()['posts'] info["posts"][i] = data else: status("WARN", "Could not fetch friend(s) post(s), no friends exist", self.statface) else: status("WARN", "Could not fetch friend(s) post(s), no friends exist", self.statface) if not self.id: info = None status("FAIL", "Could not fetch friend(s) post(s), invalid data provided", self.statface) else: status("INFO", "Succesfully fetched friend(s) post(s)", self.statface) return info def get_team(self): info = {"posts": None} members_data = info_team(user_id=self.id, username=self.username, occupation_id=self.occupation_id, team_id=self.team_id) members_info = members_data.get_members() if members_info: members = members_info['members'] else: status("WARN", "Team posts unable to be fetched, no other team members", self.statface) members = None info = None if members: info = {"posts":[{column: None for column in self.columns} for member in members]} for i, member in enumerate(members): member_info = post(username=member['username']) member_info.columns = self.columns data = member_info.get_user()['posts'] info['posts'][i] = data if not members_data.team_id: status("FAIL", "Team posts unable to be fetched, invalid data provided", self.statface) info = None else: status("INFO", "Team posts Succesfully fetched", self.statface) else: status("WARN", "Team posts unable to be fetched, no team members provided", self.statface) return info def get_permissions(self): info = {"delete": False, "edit": False} subject = info_auth(user_id=self.id, items=['level', 'username']).get() if subject['level'] == "management" or subject['level'] == "admin": info['delete'] = True if subject['level'] == "admin": info['edit'] = True target_info = post(post_id=self.post_id, items=['username']).get() if dict_key_verify(target_info, 'username'): target_username = target_info['username'] if subject['username'] == target_username: info['delete'] = True target_team_info = info_team(username=target_username).get_leaders() if dict_key_verify(target_team_info, 'leaders'): target_leaders = target_team_info['leaders'] if subject['username'] in target_leaders: info['delete'] = True if not self.id or not self.post_id: info = None status("FAIL", "Permissions could not be fetched, invalid data provided", self.statface) else: status("INFO", "Permissions succesfully fetched", self.statface) return info def set(self, data=None): self.content = data['content'] self.caption = None caption_intended = False if dict_key_verify(data, 'caption'): caption_intended = True if len(data['caption']) <= 100: self.caption = data['caption'] post_id = uuid_generate() valid_time = timestamp().is_valid_time() self.cur.execute("SELECT post_id FROM posts WHERE user_id=? AND date=?", (self.id, self.date)) if not self.cur.fetchone() and valid_time and ((caption_intended and self.caption) or (not caption_intended and not self.caption)): self.cur.execute("INSERT INTO posts (post_id, user_id, content, caption, date) VALUES (?, ?, ?, ?, ?)", (post_id, self.id, self.content, self.caption, self.date)) self.db.commit() status("INFO", "Post successfully created", self.statface) else: status("FAIL", "Post could not be created, invalid data provided", self.statface) def delete(self): if self.id and self.date: self.cur.execute("SELECT post_id FROM posts WHERE user_id=? AND date=?", (self.id, self.date)) rez = self.cur.fetchone() if rez and not self.post_id: self.post_id = rez[0] if self.post_id: self.cur.execute("DELETE FROM posts WHERE post_id=?", (self.post_id,)) self.db.commit() status("INFO", "Post successfully deleted", self.statface) else: status("FAIL", "Post could not be deleted, invalid data provided", self.statface) def _sort(self, posts): for post in posts: if dict_key_verify(post, "post_id"): num_likes = post_impressions(post_id=post['post_id']).count() post['impression_count'] = num_likes if len(posts) < 2: return posts mid = len(posts) // 2 return __merge( left=_sort(posts[:mid]), right=_sort(posts[mid:])) def __merge(self, left, right): if len(left) == 0: return right if len(right) == 0: return left result = [] index_left = index_right = 0 while len(result) < len(left) + len(right): if left[index_left]['impression_count'] <= right[index_right]['impression_count']: result.append(left[index_left]) index_left += 1 else: result.append(right[index_right]) index_right += 1 if index_right == len(right): result += left[index_left:] break if index_left == len(left): result += right[index_right:] break class comment(user_content): def __init__(self, user_id=None, username=None, occupation_id=None, team_id=None, comment_id=None, post_id=None, content=None, *args, **kwargs): self.allowed_columns = ['comment_id','post_id','username','content'] super().__init__(user_id=user_id, username=username, occupation_id=occupation_id, team_id=team_id, comment_id=comment_id, post_id=post_id, content=content) def get(self): info = {'comments':None} info['comments'] = {column: None for column in self.columns} for column in self.columns: if column == "username": column = "user_id" self.cur.execute(f"SELECT {column} FROM comments WHERE comment_id = ?", (self.comment_id,)) rez = self.cur.fetchone() if rez: comment_info = rez[0] if column == "user_id": column = "username" comment_info = auth(user_id=comment_info).get()['username'] info['comments'][column] = comment_info else: status("FAIL", f"Comment {column} unable to be fetched, something went wrong", self.statface) else: status("WARN", "No data requested to be fetched, check inputs", self.statface) if not self.comment_id: info = None status("FAIL", "Comment unable to be fetched, invalid commentID provided", self.statface) else: status("INFO", "Succesfully fetched comment", self.statface) return info def get_post(self): info = {'comments':None} self.cur.execute("SELECT comment_id FROM comments WHERE post_id=?", (self.post_id,)) rez = self.cur.fetchall() if rez: info['comments'] = [{column: None for column in self.columns} for comment in rez] for i, comment in enumerate(rez): self.comment_id = comment[0] data = self.get() info['comments'][i] = data['comments'] else: status("WARN", "No comments related to requested post", self.statface) else: status("FAIL", "Comment(s) unable to be fetched, something went wrong", self.statface) if not self.post_id: info = None status("FAIL", "Comment(s) unable to be fetched, invalid commentID provided", self.statface) else: status("INFO", "Succesfully fetched comment(s)", self.statface) return info def get_permissions(self): info = {"delete": False, "edit": False} subject = info_auth(user_id=self.id, items=['level', 'username']).get() if subject['level'] == "management" or subject['level'] == "admin": info['delete'] = True if subject['level'] == "admin": info['edit'] = True target_info = comment(comment_id=self.comment_id, items=['username']).get() if dict_key_verify(target_info, 'username'): target_username = target_info['username'] if subject['username'] == target_username: info['delete'] = True target_team_info = info_team(username=target_username).get_leaders() if dict_key_verify(target_team_info, 'leaders'): target_leaders = target_team_info['leaders'] if subject['username'] in target_leaders: info['delete'] = True if not self.id or not self.comment_id: info = None status("FAIL", "Unable to get comment permissions, invalid user or commentID provided", self.statface) else: status("INFO", "Succesfully fetched comment permissions", self.statface) return info def set(self, data=None): comment_id = uuid_generate() if dict_key_verify(data, 'content'): self.content = data['content'] if self.content and self.post_id and self.id: self.cur.execute("INSERT INTO comments (post_id, comment_id, user_id, content) VALUES (?,?,?,?)", (self.post_id, comment_id, self.id, self.content)) self.db.commit() status("INFO", "Succesfully created comment", self.statface) post_info = post(post_id=self.post_id).get()['posts'] poster_id = info_user_id(post_info['username']).get()['user_id'] notif_data = {'target_id': poster_id, 'title': "New comment on post", 'content': f"Someone commented on your post!"} notification().create(notif_data) else: status("FAIL", "Could not create comment, invalid content, postID or user provided", self.statface) def delete(self): if self.comment_id: self.cur.execute("DELETE FROM comments WHERE comment_id=?", (self.comment_id,)) self.db.commit() status("INFO", "Succesfully deleted comment", self.statface) else: status("FAIL", "Could not delete comment, invalid commendID provided", self.statface) class impression(user_content): # this class is inherited by post_impression and comment_impression # because of this it uses attributes for the table names and the table fields # they are baked into the sql string because these are NOT user defined and so therefor there is no security risk baking it into a string @property def impression_id(self): return self._impression_id @impression_id.setter def impression_id(self, value): self.cur.execute(f"SELECT impression_id FROM {self.table_name} WHERE impression_id = ?", (value,)) if not self.cur.fetchone(): value = None self._impression_id = value @property def impression_type(self): return self._impression_type @impression_type.setter def impression_type(self, value): if not value in self.types: value = None self._impression_type = value @property def table_name(self): return self._table_name @table_name.setter def table_name(self, value): if value != "post_impressions" and value != "comment_impressions": value = None self._table_name = value if value: self.attr_name = value.replace("impressions", "id") @property def attr_name(self): return self._attr_name @attr_name.setter def attr_name(self, value): if value != "post_id" and value != "comment_id": value = None self._attr_name = value if not self.table_name and value: self.table_name = value.replace("id", "impressions") @property def attr_id(self): return self._attr_id @attr_id.setter def attr_id(self, value): root_table = "comments" if self.attr_name: if "post" in self.attr_name: root_table = "posts" self.cur.execute(f"SELECT user_id FROM {root_table} WHERE {self.attr_name} = ?", (value,)) if not self.cur.fetchone(): value = None else: value = None self._attr_id = value def __init__(self, user_id=None, username=None, comment_id=None, post_id=None, impression_id=None, impression_type=None, table_name=None, attr_name=None, attr_id=None, *args, **kwargs): if not hasattr(self, "allowed_columns"): self.allowed_columns = ['impression_id','username','type'] super().__init__(user_id=user_id, username=username, post_id=post_id, comment_id=comment_id) self.impression_id = impression_id if not hasattr(self, "types"): self.types = ['like'] if not hasattr(self, "table_name"): self.table_name = table_name if not hasattr(self, "attr_name"): self.attr_name = attr_name if not hasattr(self, "attr_id"): self.attr_id = attr_id def get(self): info = {'impressions':None} info['impressions'] = [{column: None for column in self.columns}] for column in self.columns: if column == "username": column = "user_id" self.cur.execute(f"SELECT {column} FROM {self.table_name} WHERE impression_id=?", (self.impression_id,)) rez = self.cur.fetchone() if rez: rez_info = rez[0] if column == "user_id": column = "username" rez_info = auth(user_id=rez_info, items=['username']).get()['username'] info['impressions'][0][column] = rez_info else: status("FAIL", f"Impression {column} could not be fetched, something went wrong", self.statface) else: status("WARN", "No data requested to be fetched, check inputs", self.statface) if not self.impression_id: info = None status("FAIL", "Impressions could not be fetched, invalid impressionID provided", self.statface) else: status("INFO", "Impressions succesfully fetched", self.statface) return info def get_content(self): info = {'impressions': None} if self.attr_name: self.cur.execute(f"SELECT impression_id FROM {self.table_name} WHERE user_id=? AND {self.attr_name}=?", (self.id,self.attr_id)) rez = self.cur.fetchall() if rez: info['impressions'] = [{column: None for column in self.columns} for impression_id in rez] for i, impression_id in enumerate(rez): impression_info = self.class_type() impression_info.impression_id = impression_id[0] impression_info.columns = self.columns impression_info = impression_info.get()['impressions'][0] info['impressions'][i] = impression_info else: status("WARN", "Post/comment has no impressions associated", self.statface) else: status("WARN", "Impression(s) unable to be fetched, somethign went wrong", self.statface) else: status("FAIL", "Impression(s) unable to be fetched, impression type unspecified", self.statface) if not self.attr_id: info = None status("FAIL", "Impression(s) unable to be fetched, invalid post/comment ID provided", self.statface) else: status("INFO", "Succesfully fetched impression(s)", self.statface) return info def count(self, data=None): info = {'impression_count': 0} if dict_key_verify(data, "impression_type") and not self.impression_type: self.impression_type = data['impression_type'] if self.impression_type: self.cur.execute(f"SELECT COUNT(*) FROM {self.table_name} WHERE type = ? AND {self.attr_name} = ?", (self.impression_type,self.attr_id)) rez = self.cur.fetchall() if rez: info['impression_count'] = rez[0][0] status("INFO", "Succesfully fetched impression count", self.statface) else: status("FAIL", "Impression count unable to be fetched, something went wrong", self.statface) else: info = None status("FAIL", "Impression count unable to be fetched, invalid impression type", self.statface) return info def set(self, data=None): if dict_key_verify(data, "impression_type"): self.impression_type = data['impression_type'] impression_id = uuid_generate() exists = False self.cur.execute(f"SELECT type FROM {self.table_name} WHERE user_id=? AND type=? AND {self.attr_name}=?", (self.id, self.impression_type, self.attr_id)) if self.cur.fetchone(): exists = True else: status("WARN", "Impression from this user of this type already exists on this content", self.statface) if self.impression_type and self.id and self.attr_id: if not exists: self.cur.execute(f"INSERT INTO {self.table_name} (impression_id, {self.attr_name}, user_id, type) VALUES (?, ?, ?, ?)", (impression_id, self.attr_id, self.id , self.impression_type)) self.db.commit() status("INFO", "Impression succesfully created", self.statface) else: status("FAIL", "Impression unable to be created, impression already exists", self.statface) else: status("FAIL", "Impression unable to be created, invalid impression type, user or post/comment ID provided", self.statface) def delete(self, data=None): if dict_key_verify(data, "impression_id"): self.impression_id = data['impression_id'] if self.impression_id: self.cur.execute(f"DELETE FROM {self.table_name} WHERE impression_id=?", (self.impression_id,)) self.db.commit() status("INFO", "Succesfully deleted impression", self.statface) else: status("FAIL", "Impression unable to be deleted, invalid impression ID provided", self.statface) class post_impression(impression): def __init__(self, user_id=None, username=None, post_id=None, impression_type=None, *args, **kwargs): self.allowed_columns = ['impression_id','post_id','username','type'] self.types = ['like'] self.table_name = "post_impressions" self.class_type = post_impression super().__init__(user_id=user_id, username=username, post_id=post_id, impression_type=impression_type) if self.post_id: self.attr_id = self.post_id @property def post_id(self): return self._post_id @post_id.setter def post_id(self, value): self.cur.execute("SELECT content FROM posts WHERE post_id = ?", (value,)) if not self.cur.fetchone(): value = None self._post_id = value self.attr_id = value def get_post(self): info = self.get_content() return info class comment_impression(impression): def __init__(self, user_id=None, username=None, comment_id=None, impression_type=None, *args, **kwargs): self.types = ['like'] self.table_name = "comment_impressions" self.allow_columns = ['impression_id','comment_id','username','type'] self.class_type = comment_impression super().__init__(user_id=user_id, username=username, comment_id=comment_id, impression_type=impression_type) if self.comment_id: self.attr_id = self.comment_id def get_comment(self): info = self.get_content() return info @property def comment_id(self): return self._comment_id @comment_id.setter def comment_id(self, value): self.cur.execute("SELECT content FROM comments WHERE comment_id=?", (value,)) if not self.cur.fetchone(): value = None self._comment_id = value self.attr_id = value