1078 lines
44 KiB
Python
1078 lines
44 KiB
Python
from modules.user.info import table, auth
|
|
from modules.user.info import user_id as info_user_id
|
|
from modules.user.info import auth as info_auth
|
|
from modules.user.info import friend as info_friends
|
|
from modules.user.info import team as info_team
|
|
|
|
from modules.algorithms.uuid import generate as uuid_generate
|
|
from modules.algorithms.univ import dict_key_verify
|
|
|
|
from modules.data.config import read as config_read
|
|
from modules.data.database import connect as db_connect
|
|
from modules.data.datetime import timestamp
|
|
|
|
from modules.track.logging import status
|
|
|
|
from PIL import Image
|
|
import io
|
|
|
|
class user_content(table):
|
|
def __init__(self, user_id=None, username=None, occupation_id=None, team_id=None, comment_id=None, post_id=None, content=None, caption=None, *args, **kwargs):
|
|
if not self.allowed_columns:
|
|
self.allowed_columns = ['post_id', 'content', 'caption', 'username', 'team_id', 'date']
|
|
super().__init__(user_id=user_id, username=username, occupation_id=occupation_id, team_id=team_id, post_id=post_id, content=content)
|
|
self.post_id = post_id
|
|
self.comment_id = comment_id
|
|
self.content = content
|
|
self.caption = caption
|
|
|
|
@property
|
|
def id(self):
|
|
return self._id
|
|
@id.setter
|
|
def id(self, value):
|
|
if type(value) == str:
|
|
self.cur.execute("SELECT username FROM auth_credentials WHERE user_id = ?", (value,))
|
|
if self.cur.fetchone():
|
|
self.cur.execute("SELECT post_id FROM posts WHERE user_id=? AND date=?", (value, self.date))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
self.post_id = rez[0]
|
|
else:
|
|
value = None
|
|
else:
|
|
value = None
|
|
self._id = value
|
|
|
|
@property
|
|
def post_id(self):
|
|
return self._post_id
|
|
@post_id.setter
|
|
def post_id(self, value):
|
|
self.cur.execute("SELECT content FROM posts WHERE post_id = ?", (value,))
|
|
if not self.cur.fetchone():
|
|
value = None
|
|
self._post_id = value
|
|
|
|
@property
|
|
def comment_id(self):
|
|
return self._comment_id
|
|
@comment_id.setter
|
|
def comment_id(self, value):
|
|
self.cur.execute("SELECT content FROM comments WHERE comment_id=?", (value,))
|
|
if not self.cur.fetchone():
|
|
value = None
|
|
self._comment_id = value
|
|
|
|
@property
|
|
def occupation_id(self):
|
|
return self._occupation_id
|
|
@occupation_id.setter
|
|
def occupation_id(self, value):
|
|
team_value = None
|
|
self.cur.execute("SELECT name FROM occupations WHERE occupation_id = ?", (value,))
|
|
if not self.cur.fetchone():
|
|
value = None
|
|
else:
|
|
self.cur.execute("SELECT team_id FROM teams WHERE occupation_id = ?", (value,))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
team_value = rez[0]
|
|
|
|
self.team_id = team_value
|
|
self._occupation_id = value
|
|
|
|
@property
|
|
def content(self):
|
|
return self._content
|
|
@content.setter
|
|
def content(self, value):
|
|
if type(value) != str:
|
|
value = None
|
|
self._content = value
|
|
|
|
class notification(table):
|
|
@property
|
|
def notification_id(self):
|
|
return self._notification_id
|
|
@notification_id.setter
|
|
def notification_id(self, value):
|
|
self.cur.execute("SELECT notification_id FROM notifications WHERE notification_id=?", (value,))
|
|
if not self.cur.fetchone():
|
|
value = None
|
|
self._notification_id = value
|
|
|
|
@property
|
|
def target_id(self):
|
|
return self._target_id
|
|
@target_id.setter
|
|
def target_id(self, value):
|
|
self.cur.execute("SELECT user_id FROM auth_credentials WHERE user_id=?", (value,))
|
|
user = self.cur.fetchone()
|
|
self.cur.execute("SELECT team_id FROM teams WHERE team_id=?", (value,))
|
|
team = self.cur.fetchone()
|
|
if value == "all-" + self.server_code:
|
|
all_server = True
|
|
else:
|
|
all_server = False
|
|
level = value in ['member', 'management', 'admin']
|
|
|
|
if not (user or team or all_server or level):
|
|
value = None
|
|
self._target_id = value
|
|
|
|
@property
|
|
def title(self):
|
|
return self._title
|
|
@title.setter
|
|
def title(self, value):
|
|
if type(value) != str:
|
|
value = None
|
|
self._title = value
|
|
|
|
@property
|
|
def content(self):
|
|
return self._content
|
|
@content.setter
|
|
def content(self, value):
|
|
if type(value) != str:
|
|
value = None
|
|
self._content = value
|
|
|
|
@property
|
|
def expire_after(self):
|
|
return self._expire_after
|
|
@expire_after.setter
|
|
def expire_after(self, value):
|
|
if type(value) != float and type(value) != int:
|
|
value = None
|
|
self._expire_after = value
|
|
|
|
def __init__(self, user_id=None, username=None, notification_id=None, target_id=None, title=None, content=None, expire_after=None):
|
|
self.allowed_columns = ['notification_id', 'target_id', 'title', 'content', 'time_created']
|
|
super().__init__(user_id=user_id, username=username)
|
|
self.notification_id = notification_id
|
|
self.target_id = target_id
|
|
self.title = title
|
|
self.content = content
|
|
self.expire_after = expire_after
|
|
|
|
def get_target_group(self, data=None):
|
|
# finds out the type of id thats been provided
|
|
# below the list displays the types that can be provided
|
|
info = {'type': None, 'id': None}
|
|
types = ['server', 'user', 'team', 'level']
|
|
# "server" means that everyone is the target for the notificaion
|
|
|
|
if dict_key_verify(data, 'target_id'):
|
|
self.target_id = data['target_id']
|
|
|
|
if self.target_id:
|
|
if self.target_id == "all-"+self.server_code:
|
|
# "all-<server code>" is the unique way of identifying a notification to the entire server
|
|
# its structured this way to stop any collisions with a user who might call themselves "all" or "server"
|
|
# as such the server code (usually a string of numbers or 12345 by default) is banned from use in usernames
|
|
info = {'type': types[0], 'id': self.target_id}
|
|
elif self.target_id in ['member', 'management', 'admin']:
|
|
info = {'type': types[3], 'id': self.target_id}
|
|
else:
|
|
self.cur.execute("SELECT username FROM auth_credentials WHERE user_id = ?", (self.target_id,))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
username = auth(user_id=self.target_id).get()['username']
|
|
info = {'type': types[1], 'id': username}
|
|
|
|
self.cur.execute("SELECT name FROM teams WHERE team_id = ?", (self.target_id,))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
info = {'type': types[2], 'id': self.target_id}
|
|
|
|
else:
|
|
info = None
|
|
|
|
return info
|
|
|
|
def get_targets(self, data=None):
|
|
# targets are the users that hte notifications should be sent to
|
|
# targets can be specified on creation as a number of diffrent things, for instance providing a team id allows the targeting of a team
|
|
# its implicit meaning its the servers job to find out what type of ID the user is providing
|
|
info = {'targets': None}
|
|
|
|
if dict_key_verify(data, 'target_id'):
|
|
self.target_id = target_id
|
|
|
|
if self.target_id:
|
|
target_group = self.get_target_group(self)
|
|
|
|
if target_group['type'] == "user":
|
|
info['targets'] = [{'user_id':info_user_id(username=target_group['id']).get()['user_id']}]
|
|
else:
|
|
|
|
if target_group['type'] == "server":
|
|
self.cur.execute("SELECT user_id FROM profile")
|
|
elif target_group['type'] == "team":
|
|
self.cur.execute("SELECT user_id FROM profile INNER JOIN teams USING(occupation_id) WHERE team_id=?", (target_group['id'],))
|
|
elif target_group['type'] == "level":
|
|
self.cur.execute("SELECT user_id FROM auth_credentials WHERE level=?", (target_group['id'],))
|
|
rez = self.cur.fetchall()
|
|
if rez:
|
|
info['targets'] = [{'user_id':target[0]} for target in rez]
|
|
else:
|
|
# status message
|
|
info = None
|
|
|
|
return info
|
|
|
|
def get_unsent(self, data=None):
|
|
info = {'notifications': None}
|
|
|
|
#self.cur.execute("SELECT notification_id FROM notifications_sent WHERE user_id=? AND sent=?",(self.id, False))
|
|
self.cur.execute("SELECT notification_id, time_created FROM notifications INNER JOIN notifications_sent USING(notification_id) WHERE user_id=? AND sent=?", (self.id, False))
|
|
rez = self.cur.fetchall()
|
|
if rez:
|
|
info['notifications'] = []
|
|
|
|
queued_notifs = self._sort_notifications(rez)
|
|
for unsent in queued_notifs:
|
|
notification_info = notification(notification_id=unsent)
|
|
notification_info.columns = self.columns
|
|
notif_data = notification_info.get_notification()['notifications'][0]
|
|
info['notifications'].append(notif_data)
|
|
|
|
if not self.id:
|
|
info = None
|
|
return info
|
|
|
|
def _sort_notifications(self, notifs):
|
|
for i in range(len(notifs)):
|
|
if i < len(notifs)-2:
|
|
if notifs[i][1] < notifs[i+1][1]:
|
|
swap = notifs[i+1]
|
|
notifs[i+1] = notifs[i]
|
|
notifs[i] = swap
|
|
|
|
notifs = [notif[0] for notif in notifs]
|
|
return notifs
|
|
|
|
def get(self, data=None):
|
|
info = None
|
|
|
|
if dict_key_verify(data, 'user_id'):
|
|
self.id = data['user_id']
|
|
if dict_key_verify(data, 'target_id'):
|
|
self.target_id = data['target_id']
|
|
if dict_key_verify(data, 'notification_id'):
|
|
self.notification_id = data['notification_id']
|
|
|
|
if self.notification_id:
|
|
info = self.get_notification()
|
|
elif self.id:
|
|
info = self.get_user()
|
|
else:
|
|
status("WARN", "Unable to fetch notifications, must provide a valid user or notification ID", self.statface)
|
|
|
|
return info
|
|
|
|
def get_user(self):
|
|
info = {'notifications':None}
|
|
|
|
self.cur.execute("SELECT notification_id FROM notifications_sent WHERE user_id = ?", (self.id,))
|
|
rez = self.cur.fetchall()
|
|
if rez:
|
|
info = {'notifications': []}
|
|
# rez could be user_notifs
|
|
for notif in rez:
|
|
notif_id = notif[0]
|
|
user_notification = notification(notification_id=notif_id)
|
|
user_notification.columns = self.columns
|
|
notification_info = user_notification.get_notification()['notifications'][0]
|
|
info['notifications'].append(notification_info)
|
|
else:
|
|
status("WARN", "No notifications exist for this user", self.statface)
|
|
else:
|
|
status("WARN", "No notifications exist for this user", self.statface)
|
|
|
|
if not self.id:
|
|
status("WARN", "Unable to fetch notifications, invalid user provided", self.statface)
|
|
info = None
|
|
|
|
return info
|
|
|
|
def get_group(self, data=None):
|
|
info = {'notifications': None}
|
|
|
|
if self.target_id:
|
|
self.cur.execute("SELECT notification_id FROM notifications WHERE target_id=?", (self.target_id,))
|
|
rez = self.cur.fetchall()
|
|
if rez:
|
|
info['notifications'] = [{column: None for column in self.columns} for notif in rez]
|
|
for i, notif in enumerate(rez):
|
|
notification_info = notification(notification_id=notif[0])
|
|
notification_info.columns = self.columns
|
|
notif_data = notification_info.get_notification()['notifications'][0]
|
|
info['notifications'][i] = notif_data
|
|
else:
|
|
# status message
|
|
info = None
|
|
return None
|
|
|
|
def get_notification(self, data=None):
|
|
info = {'notifications': None}
|
|
info['notifications'] = [{column: None for column in self.columns}]
|
|
|
|
if self.notification_id:
|
|
for column in self.columns:
|
|
self.cur.execute(f"SELECT {column} FROM notifications WHERE notification_id = ?", (self.notification_id,))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
info_item = rez[0]
|
|
if column == "target_id":
|
|
info_item = self.get_target_group({'target_id': info_item})['id']
|
|
info['notifications'][0][column] = rez[0]
|
|
else:
|
|
status("WARN", "Notification {column} unable to be fetched, something went wrong", self.statface)
|
|
else:
|
|
status("WARN", "No data requested to be fetched, check inputs", self.statface)
|
|
|
|
else:
|
|
info = None
|
|
status("WARN", "Notification unable to be fetched, invalid notification ID provided", self.statface)
|
|
return info
|
|
|
|
def load_notification(self, data=None):
|
|
# loads notifications into the "notification_sent" table. This is where notifications are queued for sending when their target next logs in
|
|
if dict_key_verify(data, 'notification_id'):
|
|
self.notification_id = data['notification_id']
|
|
|
|
if self.notification_id:
|
|
notification_info = notification(notification_id=self.notification_id)
|
|
notification_data = notification_info.get_notification()['notifications'][0]
|
|
|
|
notification_info.target_id = self.target_id
|
|
target_data = notification_info.get_targets()['targets']
|
|
if target_data:
|
|
for target in target_data:
|
|
self.cur.execute("INSERT INTO notifications_sent (notification_id, user_id) VALUES (?, ?)", (self.notification_id, target['user_id']))
|
|
self.db.commit()
|
|
|
|
def create(self, data=None):
|
|
if dict_key_verify(data, 'target_id'):
|
|
self.target_id = data['target_id']
|
|
if dict_key_verify(data, 'title'):
|
|
self.title = data['title']
|
|
if dict_key_verify(data, 'content'):
|
|
self.content = data['content']
|
|
if dict_key_verify(data, 'expire_after'):
|
|
self.expire_after = data['expire_after']
|
|
|
|
notification_id = uuid_generate()
|
|
time_created = timestamp().now
|
|
|
|
if self.title and self.target_id:
|
|
if not self.content:
|
|
self.content = self.title
|
|
status("WARN", "No notification content provided, setting content to title", self.statface)
|
|
if not self.expire_after:
|
|
self.expire_after = float(config_read(section="notifications", key="defaultexpiretime"))
|
|
status("WARN", "No notification expire after time provided, setting to default", self.statface)
|
|
|
|
self.cur.execute("INSERT INTO notifications (notification_id, target_id, title, content, time_created, expire_after) VALUES (?, ?, ?, ?, ?, ?)", (notification_id, self.target_id, self.title, self.content, time_created, self.expire_after))
|
|
self.db.commit()
|
|
status("INFO", "Notification successfully created", self.statface)
|
|
pre_load_notification_id = self.notification_id
|
|
self.load_notification({'notification_id': notification_id})
|
|
self.notification_id = pre_load_notification_id
|
|
|
|
else:
|
|
status("FAIL", "Unable to create notification, invalid title or target ID provided", self.statface)
|
|
|
|
def delete(self, data=None):
|
|
if dict_key_verify(data, 'target_id'):
|
|
self.target_id = data['target_id']
|
|
if dict_key_verify(data, 'notification_id'):
|
|
self.notification_id = data['notification_id']
|
|
|
|
if self.notification_id:
|
|
self.delete_notification(data)
|
|
elif self.target_id:
|
|
self.delete_group(data)
|
|
elif self.id:
|
|
self.delete_user(data)
|
|
else:
|
|
status("FAIL", "Unable to delete notification, invalid user or target/notification ID provided", self.statface)
|
|
|
|
def delete_user(self, data=None):
|
|
if self.id:
|
|
self.cur.execute("SELECT notification_id FROM notifications_sent WHERE user_id=?", (self.id,))
|
|
rez = self.cur.fetchall()
|
|
if rez:
|
|
for notif in rez:
|
|
notification_info = notification()
|
|
notification_info.delete_notification({'notification_id':notif[0]})
|
|
else:
|
|
status("WARN", "User has no notifications to be deleted", self.statface)
|
|
else:
|
|
status("FAIL", "Unable to delete notification(s), something went wrong", self.statface)
|
|
else:
|
|
status("FAIL", "Unable to delete notification(s), invalid user provided", self.statface)
|
|
|
|
def delete_group(self, data=None):
|
|
if self.target_id:
|
|
|
|
self.cur.execute("SELECT notification_id FROM notifications WHERE target_id=?", (self.target_id,))
|
|
rez = self.cur.fetchall()
|
|
if rez:
|
|
for notif in rez:
|
|
notification_info = notification(notification_id=notif[0])
|
|
notification_info.delete_notification()
|
|
else:
|
|
status("WARN", "Target(s) have no notifications to be deleted", self.statface)
|
|
else:
|
|
status("FAIL", "Notification(s) unable to be deleted somethign went wrong", self.statface)
|
|
else:
|
|
status("FAIL", "Notification(s) unable to be deleted, invalid target ID provided", self.statface)
|
|
|
|
def delete_notification(self, data=None):
|
|
if self.notification_id:
|
|
self.cur.execute("DELETE FROM notifications_sent WHERE notification_id=?", (self.notification_id,))
|
|
self.db.commit()
|
|
status("INFO", "Succesfully deleted notification", self.statface)
|
|
else:
|
|
status("FAIL", "Unable to delete notification, invali notification ID provided", self.statface)
|
|
|
|
def remove(self, data=None):
|
|
if self.notification_id:
|
|
self.cur.execute("DELETE FROM notifications_sent WHERE user_id = ? AND notification_id=?", (self.id, self.notification_id,))
|
|
self.db.commit()
|
|
status("INFO", "Notification successfully removed", self.statface)
|
|
else:
|
|
status("FAIL", "Notification unable to be removed, invalid notification ID provided", self.statface)
|
|
|
|
class post(user_content):
|
|
@property
|
|
def caption(self):
|
|
return self._caption
|
|
@caption.setter
|
|
def caption(self, value):
|
|
if type(value) != str:
|
|
value = None
|
|
self._caption = value
|
|
|
|
@property
|
|
def content(self):
|
|
return self._content
|
|
@content.setter
|
|
def content(self, value):
|
|
image_formats = ['png', 'jpg']
|
|
for form in image_formats:
|
|
try:
|
|
save_path = f"data/images/post_{self.id}_{self.date}.{form}"
|
|
with Image.open(io.BytesIO(value)) as recieved:
|
|
recieved.save(save_path)
|
|
break
|
|
except:
|
|
save_path = None
|
|
self._content = save_path
|
|
|
|
def __init__(self, user_id=None, username=None, occupation_id=None, team_id=None, post_id=None, content=None, caption=None, *args, **kwargs):
|
|
self.allowed_columns = ['post_id', 'content', 'caption', 'username', 'date']
|
|
super().__init__(user_id=user_id, username=username, occupation_id=occupation_id, team_id=team_id, post_id=post_id, content=content, caption=caption)
|
|
|
|
def get_feed(self):
|
|
info = {"posts": None}
|
|
post_feeds = []
|
|
|
|
friend_posts_info = self.get_friends()
|
|
if dict_key_verify(friend_posts_info, "posts"):
|
|
friend_posts = friend_posts_info['posts']
|
|
post_feeds.append(friend_posts)
|
|
status("INFO", "Succesfully fetched friends' post(s)", self.statface)
|
|
|
|
team_posts_info = self.get_team()
|
|
if dict_key_verify(team_posts_info, "posts"):
|
|
team_posts = team_posts_info['posts']
|
|
post_feeds.append(team_posts)
|
|
status("INFO", "Succesfully fetched team's post(s)", self.statface)
|
|
|
|
for post_feed in post_feeds:
|
|
info = {"posts": []}
|
|
for post in post_feed:
|
|
info['posts'].append(post)
|
|
|
|
return info
|
|
|
|
def get(self):
|
|
info = {"posts":{column: None for column in self.columns}}
|
|
|
|
for column in self.columns:
|
|
# gots per column for the sql querey instead of quereying every field
|
|
# this is so we can add only what the user requests as their is no easy way of identifying onces fetched from a result
|
|
# without hte user of "magic numbers" anyway
|
|
if column == "username":
|
|
column = "user_id"
|
|
self.cur.execute(f"SELECT {column} FROM posts WHERE post_id=?", (self.post_id,))
|
|
rez = self.cur.fetchone()
|
|
append_item = rez[0]
|
|
if rez:
|
|
if column == "user_id":
|
|
column = "username"
|
|
append_item = auth(user_id=append_item).get()['username']
|
|
elif column == "content":
|
|
with open(append_item, "rb") as content:
|
|
append_item = content.read()
|
|
info["posts"][column] = append_item
|
|
status("INFO", "Succesfully fetched {column} for requested post", self.statface)
|
|
else:
|
|
status("FAIL", "Could not fetch post data, invalid data provided", self.statface)
|
|
else:
|
|
status("WARN", "No post data requested to be fetched, check your inputs", self.statface)
|
|
|
|
return info
|
|
|
|
def get_memories(self):
|
|
info = {"posts":None}
|
|
self.cur.execute(f"SELECT post_id FROM posts WHERE user_id=?", (self.id,))
|
|
rez = self.cur.fetchall()
|
|
|
|
if rez:
|
|
info = {"posts":[{column: None for column in self.columns} for post in rez]}
|
|
for i, post_details in enumerate(rez):
|
|
post_info = post(post_id=post_details[0])
|
|
post_info.columns = self.columns
|
|
info['posts'][i] = post_info.get()['posts']
|
|
status("INFO", "Succesfully fetched post memories", self.statface)
|
|
elif self.id:
|
|
status("WARN", "No memories exist", self.statface)
|
|
|
|
if not self.id:
|
|
status("FAIL", "Could not fetch memory data, invalid data provided or none exist", self.statface)
|
|
|
|
return info
|
|
|
|
def get_user(self):
|
|
info = {"posts":{column: None for column in self.columns}}
|
|
|
|
self.cur.execute(f"SELECT post_id FROM posts WHERE user_id=? AND date=?", (self.id, self.date))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
post_info = post(post_id=rez[0])
|
|
post_info.columns = self.columns
|
|
info = post_info.get()
|
|
status("INFO", "Post(s) Succesfully fetched", self.statface)
|
|
else:
|
|
info["posts"] = None
|
|
status("WARN", "No post(s) exist for that user", self.statface)
|
|
|
|
if not self.id or not self.date:
|
|
info = None
|
|
status("FAIL", "No posts fetched, invalid inputs", self.statface)
|
|
|
|
return info
|
|
|
|
def get_friends(self):
|
|
info = {'posts':None}
|
|
friends = info_friends(user_id=self.id).get()
|
|
if dict_key_verify(friends, 'friends'):
|
|
friends = friends['friends']
|
|
info = {"posts":[{column: None for column in self.columns} for friend in friends]}
|
|
|
|
for i, friend in enumerate(friends):
|
|
friend_info = post(username=friend['username'])
|
|
friend_info.columns = self.columns
|
|
data = friend_info.get_user()['posts']
|
|
info["posts"][i] = data
|
|
else:
|
|
status("WARN", "Could not fetch friend(s) post(s), no friends exist", self.statface)
|
|
else:
|
|
status("WARN", "Could not fetch friend(s) post(s), no friends exist", self.statface)
|
|
|
|
if not self.id:
|
|
info = None
|
|
status("FAIL", "Could not fetch friend(s) post(s), invalid data provided", self.statface)
|
|
else:
|
|
status("INFO", "Succesfully fetched friend(s) post(s)", self.statface)
|
|
|
|
return info
|
|
|
|
def get_team(self):
|
|
info = {"posts": None}
|
|
members_data = info_team(user_id=self.id, username=self.username, occupation_id=self.occupation_id, team_id=self.team_id)
|
|
members_info = members_data.get_members()
|
|
if members_info:
|
|
members = members_info['members']
|
|
else:
|
|
status("WARN", "Team posts unable to be fetched, no other team members", self.statface)
|
|
members = None
|
|
info = None
|
|
|
|
if members:
|
|
info = {"posts":[{column: None for column in self.columns} for member in members]}
|
|
|
|
for i, member in enumerate(members):
|
|
member_info = post(username=member['username'])
|
|
member_info.columns = self.columns
|
|
data = member_info.get_user()['posts']
|
|
|
|
info['posts'][i] = data
|
|
|
|
if not members_data.team_id:
|
|
status("FAIL", "Team posts unable to be fetched, invalid data provided", self.statface)
|
|
info = None
|
|
else:
|
|
status("INFO", "Team posts Succesfully fetched", self.statface)
|
|
else:
|
|
status("WARN", "Team posts unable to be fetched, no team members provided", self.statface)
|
|
|
|
return info
|
|
|
|
def get_permissions(self):
|
|
info = {"delete": False, "edit": False}
|
|
|
|
subject = info_auth(user_id=self.id, items=['level', 'username']).get()
|
|
if subject['level'] == "management" or subject['level'] == "admin":
|
|
info['delete'] = True
|
|
if subject['level'] == "admin":
|
|
info['edit'] = True
|
|
|
|
target_info = post(post_id=self.post_id, items=['username']).get()
|
|
if dict_key_verify(target_info, 'username'):
|
|
target_username = target_info['username']
|
|
if subject['username'] == target_username:
|
|
info['delete'] = True
|
|
|
|
target_team_info = info_team(username=target_username).get_leaders()
|
|
if dict_key_verify(target_team_info, 'leaders'):
|
|
target_leaders = target_team_info['leaders']
|
|
if subject['username'] in target_leaders:
|
|
info['delete'] = True
|
|
|
|
if not self.id or not self.post_id:
|
|
info = None
|
|
status("FAIL", "Permissions could not be fetched, invalid data provided", self.statface)
|
|
else:
|
|
status("INFO", "Permissions succesfully fetched", self.statface)
|
|
return info
|
|
|
|
def set(self, data=None):
|
|
self.content = data['content']
|
|
self.caption = None
|
|
caption_intended = False
|
|
|
|
if dict_key_verify(data, 'caption'):
|
|
caption_intended = True
|
|
if len(data['caption']) <= 100:
|
|
self.caption = data['caption']
|
|
post_id = uuid_generate()
|
|
|
|
valid_time = timestamp().is_valid_time()
|
|
self.cur.execute("SELECT post_id FROM posts WHERE user_id=? AND date=?", (self.id, self.date))
|
|
|
|
if not self.cur.fetchone() and valid_time and ((caption_intended and self.caption) or (not caption_intended and not self.caption)):
|
|
self.cur.execute("INSERT INTO posts (post_id, user_id, content, caption, date) VALUES (?, ?, ?, ?, ?)", (post_id, self.id, self.content, self.caption, self.date))
|
|
self.db.commit()
|
|
status("INFO", "Post successfully created", self.statface)
|
|
else:
|
|
status("FAIL", "Post could not be created, invalid data provided", self.statface)
|
|
|
|
def delete(self):
|
|
if self.id and self.date:
|
|
self.cur.execute("SELECT post_id FROM posts WHERE user_id=? AND date=?", (self.id, self.date))
|
|
rez = self.cur.fetchone()
|
|
if rez and not self.post_id:
|
|
self.post_id = rez[0]
|
|
|
|
if self.post_id:
|
|
self.cur.execute("DELETE FROM posts WHERE post_id=?", (self.post_id,))
|
|
self.db.commit()
|
|
status("INFO", "Post successfully deleted", self.statface)
|
|
else:
|
|
status("FAIL", "Post could not be deleted, invalid data provided", self.statface)
|
|
|
|
def _sort(self, posts):
|
|
for post in posts:
|
|
if dict_key_verify(post, "post_id"):
|
|
num_likes = post_impressions(post_id=post['post_id']).count()
|
|
post['impression_count'] = num_likes
|
|
|
|
if len(posts) < 2:
|
|
return posts
|
|
mid = len(posts) // 2
|
|
|
|
return __merge(
|
|
left=_sort(posts[:mid]),
|
|
right=_sort(posts[mid:]))
|
|
|
|
def __merge(self, left, right):
|
|
if len(left) == 0:
|
|
return right
|
|
if len(right) == 0:
|
|
return left
|
|
|
|
result = []
|
|
index_left = index_right = 0
|
|
while len(result) < len(left) + len(right):
|
|
if left[index_left]['impression_count'] <= right[index_right]['impression_count']:
|
|
result.append(left[index_left])
|
|
index_left += 1
|
|
else:
|
|
result.append(right[index_right])
|
|
index_right += 1
|
|
|
|
if index_right == len(right):
|
|
result += left[index_left:]
|
|
break
|
|
if index_left == len(left):
|
|
result += right[index_right:]
|
|
break
|
|
|
|
class comment(user_content):
|
|
def __init__(self, user_id=None, username=None, occupation_id=None, team_id=None, comment_id=None, post_id=None, content=None, *args, **kwargs):
|
|
self.allowed_columns = ['comment_id','post_id','username','content']
|
|
super().__init__(user_id=user_id, username=username, occupation_id=occupation_id, team_id=team_id, comment_id=comment_id, post_id=post_id, content=content)
|
|
|
|
def get(self):
|
|
info = {'comments':None}
|
|
|
|
info['comments'] = {column: None for column in self.columns}
|
|
for column in self.columns:
|
|
if column == "username":
|
|
column = "user_id"
|
|
self.cur.execute(f"SELECT {column} FROM comments WHERE comment_id = ?", (self.comment_id,))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
comment_info = rez[0]
|
|
if column == "user_id":
|
|
column = "username"
|
|
comment_info = auth(user_id=comment_info).get()['username']
|
|
info['comments'][column] = comment_info
|
|
else:
|
|
status("FAIL", f"Comment {column} unable to be fetched, something went wrong", self.statface)
|
|
else:
|
|
status("WARN", "No data requested to be fetched, check inputs", self.statface)
|
|
|
|
if not self.comment_id:
|
|
info = None
|
|
status("FAIL", "Comment unable to be fetched, invalid commentID provided", self.statface)
|
|
else:
|
|
status("INFO", "Succesfully fetched comment", self.statface)
|
|
|
|
return info
|
|
|
|
def get_post(self):
|
|
info = {'comments':None}
|
|
|
|
self.cur.execute("SELECT comment_id FROM comments WHERE post_id=?", (self.post_id,))
|
|
rez = self.cur.fetchall()
|
|
if rez:
|
|
info['comments'] = [{column: None for column in self.columns} for comment in rez]
|
|
|
|
for i, comment in enumerate(rez):
|
|
self.comment_id = comment[0]
|
|
data = self.get()
|
|
info['comments'][i] = data['comments']
|
|
else:
|
|
status("WARN", "No comments related to requested post", self.statface)
|
|
else:
|
|
status("FAIL", "Comment(s) unable to be fetched, something went wrong", self.statface)
|
|
|
|
if not self.post_id:
|
|
info = None
|
|
status("FAIL", "Comment(s) unable to be fetched, invalid commentID provided", self.statface)
|
|
else:
|
|
status("INFO", "Succesfully fetched comment(s)", self.statface)
|
|
|
|
return info
|
|
|
|
def get_permissions(self):
|
|
info = {"delete": False, "edit": False}
|
|
|
|
subject = info_auth(user_id=self.id, items=['level', 'username']).get()
|
|
if subject['level'] == "management" or subject['level'] == "admin":
|
|
info['delete'] = True
|
|
if subject['level'] == "admin":
|
|
info['edit'] = True
|
|
|
|
target_info = comment(comment_id=self.comment_id, items=['username']).get()
|
|
if dict_key_verify(target_info, 'username'):
|
|
target_username = target_info['username']
|
|
if subject['username'] == target_username:
|
|
info['delete'] = True
|
|
|
|
target_team_info = info_team(username=target_username).get_leaders()
|
|
if dict_key_verify(target_team_info, 'leaders'):
|
|
target_leaders = target_team_info['leaders']
|
|
if subject['username'] in target_leaders:
|
|
info['delete'] = True
|
|
|
|
if not self.id or not self.comment_id:
|
|
info = None
|
|
status("FAIL", "Unable to get comment permissions, invalid user or commentID provided", self.statface)
|
|
else:
|
|
status("INFO", "Succesfully fetched comment permissions", self.statface)
|
|
return info
|
|
|
|
def set(self, data=None):
|
|
comment_id = uuid_generate()
|
|
if dict_key_verify(data, 'content'):
|
|
self.content = data['content']
|
|
|
|
if self.content and self.post_id and self.id:
|
|
self.cur.execute("INSERT INTO comments (post_id, comment_id, user_id, content) VALUES (?,?,?,?)", (self.post_id, comment_id, self.id, self.content))
|
|
self.db.commit()
|
|
status("INFO", "Succesfully created comment", self.statface)
|
|
|
|
post_info = post(post_id=self.post_id).get()['posts']
|
|
poster_id = info_user_id(post_info['username']).get()['user_id']
|
|
notif_data = {'target_id': poster_id, 'title': "New comment on post", 'content': f"Someone commented on your post!"}
|
|
notification().create(notif_data)
|
|
else:
|
|
status("FAIL", "Could not create comment, invalid content, postID or user provided", self.statface)
|
|
|
|
def delete(self):
|
|
if self.comment_id:
|
|
self.cur.execute("DELETE FROM comments WHERE comment_id=?", (self.comment_id,))
|
|
self.db.commit()
|
|
status("INFO", "Succesfully deleted comment", self.statface)
|
|
else:
|
|
status("FAIL", "Could not delete comment, invalid commendID provided", self.statface)
|
|
|
|
class impression(user_content):
|
|
# this class is inherited by post_impression and comment_impression
|
|
# because of this it uses attributes for the table names and the table fields
|
|
# they are baked into the sql string because these are NOT user defined and so therefor there is no security risk baking it into a string
|
|
@property
|
|
def impression_id(self):
|
|
return self._impression_id
|
|
@impression_id.setter
|
|
def impression_id(self, value):
|
|
self.cur.execute(f"SELECT impression_id FROM {self.table_name} WHERE impression_id = ?", (value,))
|
|
if not self.cur.fetchone():
|
|
value = None
|
|
self._impression_id = value
|
|
|
|
@property
|
|
def impression_type(self):
|
|
return self._impression_type
|
|
@impression_type.setter
|
|
def impression_type(self, value):
|
|
if not value in self.types:
|
|
value = None
|
|
self._impression_type = value
|
|
|
|
@property
|
|
def table_name(self):
|
|
return self._table_name
|
|
@table_name.setter
|
|
def table_name(self, value):
|
|
if value != "post_impressions" and value != "comment_impressions":
|
|
value = None
|
|
self._table_name = value
|
|
if value:
|
|
self.attr_name = value.replace("impressions", "id")
|
|
|
|
@property
|
|
def attr_name(self):
|
|
return self._attr_name
|
|
@attr_name.setter
|
|
def attr_name(self, value):
|
|
if value != "post_id" and value != "comment_id":
|
|
value = None
|
|
self._attr_name = value
|
|
if not self.table_name and value:
|
|
self.table_name = value.replace("id", "impressions")
|
|
|
|
@property
|
|
def attr_id(self):
|
|
return self._attr_id
|
|
@attr_id.setter
|
|
def attr_id(self, value):
|
|
root_table = "comments"
|
|
if self.attr_name:
|
|
if "post" in self.attr_name:
|
|
root_table = "posts"
|
|
self.cur.execute(f"SELECT user_id FROM {root_table} WHERE {self.attr_name} = ?", (value,))
|
|
if not self.cur.fetchone():
|
|
value = None
|
|
else:
|
|
value = None
|
|
self._attr_id = value
|
|
|
|
def __init__(self, user_id=None, username=None, comment_id=None, post_id=None, impression_id=None, impression_type=None, table_name=None, attr_name=None, attr_id=None, *args, **kwargs):
|
|
if not hasattr(self, "allowed_columns"):
|
|
self.allowed_columns = ['impression_id','username','type']
|
|
super().__init__(user_id=user_id, username=username, post_id=post_id, comment_id=comment_id)
|
|
self.impression_id = impression_id
|
|
|
|
if not hasattr(self, "types"):
|
|
self.types = ['like']
|
|
if not hasattr(self, "table_name"):
|
|
self.table_name = table_name
|
|
if not hasattr(self, "attr_name"):
|
|
self.attr_name = attr_name
|
|
if not hasattr(self, "attr_id"):
|
|
self.attr_id = attr_id
|
|
|
|
def get(self):
|
|
info = {'impressions':None}
|
|
info['impressions'] = [{column: None for column in self.columns}]
|
|
|
|
for column in self.columns:
|
|
if column == "username":
|
|
column = "user_id"
|
|
self.cur.execute(f"SELECT {column} FROM {self.table_name} WHERE impression_id=?", (self.impression_id,))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
rez_info = rez[0]
|
|
if column == "user_id":
|
|
column = "username"
|
|
rez_info = auth(user_id=rez_info, items=['username']).get()['username']
|
|
info['impressions'][0][column] = rez_info
|
|
else:
|
|
status("FAIL", f"Impression {column} could not be fetched, something went wrong", self.statface)
|
|
else:
|
|
status("WARN", "No data requested to be fetched, check inputs", self.statface)
|
|
|
|
if not self.impression_id:
|
|
info = None
|
|
status("FAIL", "Impressions could not be fetched, invalid impressionID provided", self.statface)
|
|
else:
|
|
status("INFO", "Impressions succesfully fetched", self.statface)
|
|
|
|
return info
|
|
|
|
def get_content(self):
|
|
info = {'impressions': None}
|
|
|
|
if self.attr_name:
|
|
self.cur.execute(f"SELECT impression_id FROM {self.table_name} WHERE user_id=? AND {self.attr_name}=?", (self.id,self.attr_id))
|
|
rez = self.cur.fetchall()
|
|
if rez:
|
|
info['impressions'] = [{column: None for column in self.columns} for impression_id in rez]
|
|
for i, impression_id in enumerate(rez):
|
|
impression_info = self.class_type()
|
|
impression_info.impression_id = impression_id[0]
|
|
impression_info.columns = self.columns
|
|
impression_info = impression_info.get()['impressions'][0]
|
|
info['impressions'][i] = impression_info
|
|
else:
|
|
status("WARN", "Post/comment has no impressions associated", self.statface)
|
|
else:
|
|
status("WARN", "Impression(s) unable to be fetched, somethign went wrong", self.statface)
|
|
else:
|
|
status("FAIL", "Impression(s) unable to be fetched, impression type unspecified", self.statface)
|
|
|
|
if not self.attr_id:
|
|
info = None
|
|
status("FAIL", "Impression(s) unable to be fetched, invalid post/comment ID provided", self.statface)
|
|
else:
|
|
status("INFO", "Succesfully fetched impression(s)", self.statface)
|
|
|
|
return info
|
|
|
|
def count(self, data=None):
|
|
info = {'impression_count': 0}
|
|
|
|
if dict_key_verify(data, "impression_type") and not self.impression_type:
|
|
self.impression_type = data['impression_type']
|
|
|
|
if self.impression_type:
|
|
self.cur.execute(f"SELECT COUNT(*) FROM {self.table_name} WHERE type = ? AND {self.attr_name} = ?", (self.impression_type,self.attr_id))
|
|
rez = self.cur.fetchall()
|
|
if rez:
|
|
info['impression_count'] = rez[0][0]
|
|
status("INFO", "Succesfully fetched impression count", self.statface)
|
|
else:
|
|
status("FAIL", "Impression count unable to be fetched, something went wrong", self.statface)
|
|
|
|
else:
|
|
info = None
|
|
status("FAIL", "Impression count unable to be fetched, invalid impression type", self.statface)
|
|
|
|
return info
|
|
|
|
def set(self, data=None):
|
|
if dict_key_verify(data, "impression_type"):
|
|
self.impression_type = data['impression_type']
|
|
impression_id = uuid_generate()
|
|
|
|
exists = False
|
|
self.cur.execute(f"SELECT type FROM {self.table_name} WHERE user_id=? AND type=? AND {self.attr_name}=?", (self.id, self.impression_type, self.attr_id))
|
|
if self.cur.fetchone():
|
|
exists = True
|
|
else:
|
|
status("WARN", "Impression from this user of this type already exists on this content", self.statface)
|
|
|
|
if self.impression_type and self.id and self.attr_id:
|
|
if not exists:
|
|
self.cur.execute(f"INSERT INTO {self.table_name} (impression_id, {self.attr_name}, user_id, type) VALUES (?, ?, ?, ?)", (impression_id, self.attr_id, self.id , self.impression_type))
|
|
self.db.commit()
|
|
status("INFO", "Impression succesfully created", self.statface)
|
|
else:
|
|
status("FAIL", "Impression unable to be created, impression already exists", self.statface)
|
|
else:
|
|
status("FAIL", "Impression unable to be created, invalid impression type, user or post/comment ID provided", self.statface)
|
|
|
|
def delete(self, data=None):
|
|
if dict_key_verify(data, "impression_id"):
|
|
self.impression_id = data['impression_id']
|
|
|
|
if self.impression_id:
|
|
self.cur.execute(f"DELETE FROM {self.table_name} WHERE impression_id=?", (self.impression_id,))
|
|
self.db.commit()
|
|
status("INFO", "Succesfully deleted impression", self.statface)
|
|
else:
|
|
status("FAIL", "Impression unable to be deleted, invalid impression ID provided", self.statface)
|
|
|
|
class post_impression(impression):
|
|
def __init__(self, user_id=None, username=None, post_id=None, impression_type=None, *args, **kwargs):
|
|
self.allowed_columns = ['impression_id','post_id','username','type']
|
|
self.types = ['like']
|
|
self.table_name = "post_impressions"
|
|
self.class_type = post_impression
|
|
super().__init__(user_id=user_id, username=username, post_id=post_id, impression_type=impression_type)
|
|
|
|
if self.post_id:
|
|
self.attr_id = self.post_id
|
|
|
|
@property
|
|
def post_id(self):
|
|
return self._post_id
|
|
@post_id.setter
|
|
def post_id(self, value):
|
|
self.cur.execute("SELECT content FROM posts WHERE post_id = ?", (value,))
|
|
if not self.cur.fetchone():
|
|
value = None
|
|
self._post_id = value
|
|
self.attr_id = value
|
|
|
|
def get_post(self):
|
|
info = self.get_content()
|
|
return info
|
|
|
|
class comment_impression(impression):
|
|
def __init__(self, user_id=None, username=None, comment_id=None, impression_type=None, *args, **kwargs):
|
|
self.types = ['like']
|
|
self.table_name = "comment_impressions"
|
|
self.allow_columns = ['impression_id','comment_id','username','type']
|
|
self.class_type = comment_impression
|
|
super().__init__(user_id=user_id, username=username, comment_id=comment_id, impression_type=impression_type)
|
|
|
|
if self.comment_id:
|
|
self.attr_id = self.comment_id
|
|
|
|
def get_comment(self):
|
|
info = self.get_content()
|
|
return info
|
|
|
|
@property
|
|
def comment_id(self):
|
|
return self._comment_id
|
|
@comment_id.setter
|
|
def comment_id(self, value):
|
|
self.cur.execute("SELECT content FROM comments WHERE comment_id=?", (value,))
|
|
if not self.cur.fetchone():
|
|
value = None
|
|
self._comment_id = value
|
|
self.attr_id = value
|
|
|