This repository has been archived on 2025-02-10. You can view files and clone it, but cannot push or open issues or pull requests.
Files
beopen/server/modules/handler/handler.py
2025-02-10 12:37:33 +00:00

1008 lines
40 KiB
Python

from modules.auth.auth import authorised
from modules.track import *
send_status = logging.status.send_status
status = logging.status
log = logging.log
from modules.algorithms.univ import dict_key_verify
from modules.user import info as user_info
from modules.user import content as content_info
from modules.data.datetime import timestamp
from modules.data.config import read as config_read
from modules.data.database import key as db_key
from modules.data.database import encryption as db_encrpytion
import eventlet
# for this section client even calls call a handler. This handler then calls the root handler passing the target method as an argument
# this allows for one method (the root handler) to handle overhead tasks taht are required for every event called
# this is done to make the code simple and reduce boiler plate
class root_handler():
def __init__(self, sio, sid, session, min_level='admin', event_name='event', *args, **kwargs):
self.info = None
self.status = None
self.sio = sio
self.sid = sid
self.event_name = event_name
self.min_level = min_level
self.session = session
self.user_id = sio.get_session(sid)['id']
self.user_level = sio.get_session(sid)['level']
# permissions levels
self.member = self.authorised(level='member')
self.management = self.authorised(level='management')
self.admin = self.authorised(level='admin')
# permissions levels
self.statface = logging.status_interface(sio, sid, self.user_id, self)
def handle(self, method, data=None, auth=None):
# bunch of boiler plate,
if self.session.mode == "normal":
self.obj.id = self.user_id
# statface is the interface used for status messages, anytime a status message is created this interface is also passed
self.obj.statface = self.statface
#checking the user calling the event is authorised based on their level
if self.authorised(level=self.min_level):
method(data=data)
else:
status("WARN", f"{self.event_name} - User not authorised", self.statface)
else:
status("FAIL", f"Server is currently in {self.session.mode} and so is not accepting calls to this event, try again later", self.statface)
def authorised(self, level=None, username=None, mode="and", lead_username=None, associated_username=None, *args, **kwargs):
# an all in one function for verifying if a user is authorised for an action in a number of diffretn ways:
# - association
# - level
# - team lead
auths = []
if username:
lead_username = username
associated_username = username
if level:
# identifies if the user has level privalidges matchign that given
level_auth = False
level_list = ['member', 'management', 'admin']
allow_levels = level_list[level_list.index(level):]
if self.user_level in allow_levels:
level_auth = True
auths.append(level_auth)
client_username = user_info.auth(user_id=self.user_id).get()['username']
if lead_username:
# checks if the user has leader privalidges over the target user
leader_auth = False
subjects_leaders = user_info.team(username=lead_username).get_leaders()['leaders']
if subjects_leaders and client_username in subjects_leaders:
leader_auth = True
auths.append(leader_auth)
if associated_username:
# your associated to a user if your in the same team or your friends
# so if associated auth is true it means the target user is in the same team or friends with the subject user
associated_auth = False
subjects_friends = user_info.friend(username=associated_username).get()['friends']
subject_team = user_info.team(username=associated_username).get_members()
if subject_team:
if client_username in subjects_friends or client_username in subject_team['members']:
associated_auth = True
auths.append(associated_auth)
if mode == "and":
auth = all(auths)
if mode == "or":
auth = (True in auths)
else:
auth = auths
# returns auth as either true or false
return auth
def _leader_check(self, leaders, username):
leader = False
for leader in leaders:
if leader['username'] == username:
leader = True
return leader
def is_leader(self, user_id, identifier):
username = user_info.auth(user_id=user_id).get()['username']
leader = False
leaders = user_info.team(user_id=identifier, username=identifier, occupation_id=identifier, team_id=identifier).get_leaders()['leaders']
if leaders:
leader = self._leader_check(leaders, username)
else:
leaders = user_info.team(user_id=identifier).get_leaders()['leaders']
if leaders:
leader = self._leader_check(leaders, username)
return leader
class auth_handler(root_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='authentication info event', *args, **kwargs):
super().__init__(sio, sid, session, min_level=min_level, event_name=event_name, *args, **kwargs)
self.obj = user_info.auth(user_id=self.user_id)
def get(self, data=None):
self.handle(self._get, data)
return self.info, self.status
def _get(self, data=None):
if dict_key_verify(data, 'username') and self.admin:
self.obj.username = data['username']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get()
def set(self, data=None):
self.handle(self._get, data)
return self.info, self.status
def _set(self, data=None):
if dict_key_verify(data, 'username'):
profile.username = data['username']
if dict_key_verify(data, 'items'):
profile.columns = data['items']
self.obj.set(data)
class profile_handler(root_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='profile event', *args, **kwargs):
super().__init__(sio, sid, session, min_level=min_level, event_name=event_name, *args, **kwargs)
self.obj = user_info.profile(user_id=self.user_id)
def get(self, data=None):
self.handle(self._get, data)
return self.info, self.status
def _get(self, data=None):
if dict_key_verify(data, 'username'):
self.obj.username = data['username']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get()
def get_permissions(self, data=None):
self.handle(self._get_permissions, data)
return self.info, self.status
def _get_permissions(self, data=None):
if dict_key_verify(data, 'username'):
self.obj.username = data['username']
if dict_key_verify(data, 'target_username'):
self.obj.target_username = data['target_username']
self.info = self.obj.get_permissions()
def set(self, data=None):
self.handle(self._set, data)
return self.info, self.status
def _set(self, data=None):
if dict_key_verify(data, 'username'):
if self.authorised('management', data['username'], "or"):
self.obj.username = data['username']
self.obj.columns = list(data.keys())
self.obj.set(data)
status("INFO", "item(s) successfully set to provided value(s)")
def delete(self, data=None):
self.handle(self._delete, data)
return self.info, self.status
def _delete(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.obj.delete()
class occupation_handler(root_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='occupation event', *args, **kwargs):
super().__init__(sio, sid, session, min_level='admin', event_name=event_name, *args, **kwargs)
self.obj = user_info.occupation(user_id=self.user_id)
def get(self, data=None):
self.handle(self._get, data)
return self.info, self.status
def _get(self, data=None):
if dict_key_verify(data, 'username'):
self.obj.username = data['username']
if dict_key_verify(data, 'occupation_id'):
self.obj.occupation_id = data['occupation_id']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get()
def get_all(self, data=None):
self.handle(self._get_all, data)
return self.info, self.status
def _get_all(self, data):
self.info = self.obj.get_all()
def set(self, data=None):
self.handle(self._set, data)
return self.info, self.status
def _set(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
if dict_key_verify(data, 'occupation_id'):
self.obj.set(data)
status("INFO", "Occupation successfully set", self.statface)
else:
status("WARN", "Occupation couldnt be updated no value(s) provided", self.statface)
def set_request(self, data=None):
self.handle(self._set_request, data)
return self.info, self.status
def _set_request(self, data=None):
if dict_key_verify(data, 'username') and self.admin:
self.obj.username = data['username']
if dict_key_verify(data, 'occupation_id'):
self.obj.set_request(data)
status("INFO", "Occupation change request successfully created", self.statface)
else:
status("WARN", "Occupation change request could not be created no value(s) provided", self.statface)
def get_request(self, data=None):
self.handle(self._get_request, data)
return self.info, self.status
def _get_request(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
self.info = self.obj.get_request()
def get_all_request(self, data=None):
self.handle(self._get_all_request, data)
return self.info, self.status
def _get_all_request(self, data=None):
self.info = self.obj.get_all_requests()
def delete_request(self, data=None):
self.handle(self._delete_request, data)
return self.info, self.status
def _delete_request(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
self.obj.delete_request()
def approve_request(self, data=None):
self.handle(self._approve_request, data)
return self.info, self.status
def _approve_request(self, data=None):
if dict_key_verify(data, 'username'):
self.obj.username = data['username']
self.obj.approve_request()
status("INFO", "Occupation change request successfully approved", self.statface)
else:
status("FAIL", "Occupation change request unable to be approved invalid data provided", self.statface)
def reject_request(self, data=None):
self.handle(self._delete_request, data)
return self.info, self.status
def _reject_request(self, data=None):
if dict_key_verify(data, 'username'):
self.obj.username = data['username']
self.obj.reject_request()
status("INFO", "Occupation change request successfully rejected", self.statface)
else:
status("FAIL", "Occupation change request unable to be rejected invalid data provided", self.statface)
def create(self, data=None):
self.handle(self._create, data)
return self.info, self.status
def _create(self, data=None):
if dict_key_verify(data, "name"):
self.obj.create(data)
status("INFO", "Occupation successfully created", self.statface)
else:
self.status = logging.status("WARNING", "no value(s) provided").status_update(None)
status("FAIL", "Occupation could not be created invalid data provided", self.statface)
def edit(self, data=None):
self.handle(self._edit, data)
return self.info, self.status
def _edit(self, data=None):
if dict_key_verify(data, 'occupation_id'):
self.obj.occupation_id = data['occupation_id']
self.obj.columns = list(data.keys())
self.obj.edit(data)
status("INFO", "Occupation successfully edited", self.statface)
else:
status("FAIL", "Occupation could not be edited invalid data provided", self.statface)
def delete_occupation(self, data=None):
self.handle(self._delete_occupation, data)
return self.info, self.status
def _delete_occupation(self, data=None):
if dict_key_verify(data, 'occupation_id'):
self.obj.occupation_id = data['occupation_id']
self.obj.delete_occupation()
status("INFO", "Occuaption successfully deleted", self.statface)
else:
status("FAIL", "Occupation could not be deleted invalid data provided", self.statface)
class team_handler(root_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='team event', *args, **kwargs):
super().__init__(sio, sid, session, min_level='admin', event_name=event_name, *args, **kwargs)
self.obj = user_info.team(user_id=self.user_id)
def get(self, data=None):
self.handle(self._get, data)
return self.info, self.status
def _get(self, data=None):
if dict_key_verify(data, 'username'):
self.obj.username = data['username']
if dict_key_verify(data, 'user_id'):
self.obj.id = data['user_id']
if dict_key_verify(data, 'occupation_id'):
self.obj.occupation_id = data['occupation_id']
if dict_key_verify(data, 'team_id'):
self.obj.team_id = data['team_id']
if self.obj.team_id:
self.info = self.obj.get()
else:
status("FAIL", "Team data could not be fetched, invalid data provided", self.statface)
def get_all(self, data=None):
self.handle(self._get_all, data)
return self.info, self.status
def _get_all(self, data=None):
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get_all()
def get_leaders(self, data=None):
self.handle(self._get_leaders, data)
return self.info, self.status
def _get_leaders(self, data=None):
if dict_key_verify(data, 'username'):
self.obj.username = data['username']
if dict_key_verify(data, 'id'):
self.obj.id = data['id']
if dict_key_verify(data, 'occupation_id'):
self.obj.occupation_id = data['occupation_id']
if dict_key_verify(data, 'team_id'):
self.obj.team_id = data['team_id']
if self.obj.team_id:
self.info = self.obj.get_leaders()
else:
status("FAIL", "Team leaders could not be fetched, invalid data provided", self.statface)
def get_members(self, data=None):
self.handle(self._get_members, data)
return self.info, self.status
def _get_members(self, data=None):
if dict_key_verify(data, 'username'):
self.obj.username = data['username']
if dict_key_verify(data, 'id'):
self.obj.id = data['id']
if dict_key_verify(data, 'occupation_id'):
self.obj.occupation_id = data['occupation_id']
if dict_key_verify(data, 'team_id'):
self.obj.team_id = data['team_id']
if self.obj.team_id:
self.info = self.obj.get_members()
else:
status("FAIL", "Team members could not be fetched, invalid data provided")
def set(self, data=None):
self.handle(self._set, data)
return self.info, self.status
def _set(self, data=None):
if dict_key_verify(data, 'username'):
self.obj.username = data['username']
if dict_key_verify(data, 'user_id'):
self.obj.id = data['user_id']
if dict_key_verify(data, 'occupation_id'):
self.obj.occupation_id = data['occupation_id']
if dict_key_verify(data, 'team_id'):
self.obj.team_id = data['team_id']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
if self.is_leader(self.user_id, self.obj.team_id) or self.management:
if self.obj.team_id:
self.obj.set(data)
else:
status("FAIL", "Team data not changed invalid data provided", self.statface)
else:
status("FAIL", "Team data not changed, not authorised to alter this team", self.statface)
def delete_leaders(self, data=None):
self.handle(self._delete_leaders, data)
return self.info, self.status
def _delete_leaders(self, data=None):
if dict_key_verify(data, 'username'):
self.obj.username = data['username']
if dict_key_verify(data, 'user_id'):
self.obj.id = data['user_id']
if dict_key_verify(data, 'occupation_id'):
self.obj.occupation_id = data['occupation_id']
if dict_key_verify(data, 'team_id'):
self.obj.team_id = data['team_id']
if data['leaders']:
if self.is_leader(self.user_id, self.obj.team_id) or management:
self.obj.delete_leaders(data)
else:
status("FAIL", "Leader(s) was not deleted, not authorised to alter this team", self.statface)
else:
status("FAIL", "Leader(s) was not deleted, not authorised to alter this team", self.statface)
class friend_handler(root_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='friend event', *args, **kwargs):
super().__init__(sio, sid, session, min_level='admin', event_name=event_name, *args, **kwargs)
self.obj = user_info.friend(user_id=self.user_id)
def get(self, data=None):
self.handle(self._get, data)
return self.info, self.status
def _get(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
self.info = self.obj.get()
def get_requests(self, data=None):
self.handle(self._get_requests, data)
return self.info, self.status
def _get_requests(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
if dict_key_verify(data, 'mode'):
self.obj.mode = data['mode']
else:
status("WARN", "No mode provided defaulting to fetching incoming friend request(s)", self.statface)
self.info = self.obj.get_requests()
def get_recomendations(self, data=None):
self.handle(self._get_recomendations, data)
return self.info, self.status
def _get_recomendations(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
self.info = self.obj.get_recomendations(data)
def add_request(self, data=None):
self.handle(self._add_request, data)
return self.info, self.status
def _add_request(self, data=None):
if dict_key_verify(data, 'username') and self.admin:
self.obj.username = data['username']
self.info = self.obj.add_request(data)
def approve_request(self, data=None):
self.handle(self._approve_request, data)
return self.info, self.status
def _approve_request(self, data=None):
if dict_key_verify(data, 'username') and self.admin:
self.obj.username = data['username']
self.info = self.obj.approve_request(data)
def reject_request(self, data=None):
self.handle(self._remove, data)
return self.info, self.status
def remove_request(self, data=None):
self.handle(self._remove, data)
return self.info, self.status
def remove(self, data=None):
self.handle(self._remove, data)
return self.info, self.status
def _remove(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
self.info = self.obj.remove(data)
class post_handler(root_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='post event', *args, **kwargs):
super().__init__(sio, sid, session, min_level=min_level, event_name=event_name, *args, **kwargs)
self.obj = content_info.post(user_id=self.user_id)
def get_feed(self, data=None):
self.handle(self._get_feed, data)
return self.info, self.status
def _get_feed(self, data=None):
if dict_key_verify(data, 'username'):
if self.authorised('management', data['username'], "or"):
post.username = data['username']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get_feed()
def get(self, data=None):
self.handle(self._get, data)
return self.info, self.status
def _get(self, data=None):
if dict_key_verify(data, 'username'):
if self.authorised('management', data['username'], "or"):
post.username = data['username']
if dict_key_verify(data, 'post_id'):
self.obj.post_id = data['post_id']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get()
def get_memories(self, data=None):
self.handle(self._get_memories, data)
return self.info, self.status
def _get_memories(self, data=None):
if dict_key_verify(data, 'username') and self.management:
post.username = data['username']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get_memories()
def get_user(self, data=None):
self.handle(self._get_user, data)
return self.info, self.status
def _get_user(self, data=None):
if dict_key_verify(data, 'username'):
if self.authorised('management', data['username'], "or"):
self.obj.username = data['username']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get_user()
return self.info, self.status
def _get_friends(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get_friends()
def get_team(self, data=None):
self.handle(self._get_team, data)
return self.info, self.status
def _get_team(self, data=None):
if dict_key_verify(data, 'username'):
if self.authorised('management', lead_username=data['username'], mode="or"):
self.obj.username = data['username']
if dict_key_verify(data, 'team_id') and self.management:
self.obj.team_id = data['team_id']
if dict_key_verify(data, 'occupation_id') and self.management:
self.obj.occupation_id = data['occupation_id']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info =self.obj.get_team()
def get_permissions(self, data=None):
self.handle(self._get_permissions, data)
return self.info, self.status
def _get_permissions(self, data=None):
if dict_key_verify(data, 'username'):
if self.authorised('management', lead_username=data['username'], mode="or"):
self.obj.username = data['username']
if dict_key_verify(data, 'post_id'):
self.obj.post_id = data['post_id']
self.info = self.obj.get_permissions()
def set(self, data=None):
self.handle(self._set, data)
return self.info, self.status
def _set(self, data=None):
if dict_key_verify(data, 'username') and self.admin:
self.obj.username = data['username']
if dict_key_verify(data, 'content'):
self.obj.content = data['content']
if dict_key_verify(data, 'caption'):
self.obj.caption = data['caption']
if self.obj.content:
self.obj.columns = list(data.keys())
self.obj.set(data)
else:
status("FAIL", "Post could not be created, invalid image provided", self.statface)
def delete(self, data=None):
self.handle(self._delete, data)
return self.info, self.status
def _delete(self, data=None):
if dict_key_verify(data, 'username'):
if self.authorised('management', lead_username = data['username'], mode="or"):
self.obj.username = data['username']
if dict_key_verify(data, 'post_id'):
poster_info = content_info.post(post_id=data['post_id'], items=['username']).get()
if dict_key_verify(poster_info, 'posts'):
poster_info = poster_info['posts']
if dict_key_verify(poster_info, 'username'):
poster_username = poster_info['username']
if self.authorised('management', lead_username = poster_username, mode="or"):
self.obj.post_id = data['post_id']
self.obj.delete()
class comment_handler(root_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='comment event', *args, **kwargs):
super().__init__(sio, sid, session, min_level=min_level, event_name=event_name, *args, **kwargs)
self.obj = content_info.comment(user_id=self.user_id)
def get(self, data=None):
self.handle(self._get, data)
return self.info, self.status
def _get(self, data=None):
if dict_key_verify(data, 'username'):
if self.authorised('management', data['username'], "or"):
self.obj.username = data['username']
if dict_key_verify(data, 'comment_id'):
self.obj.comment_id = data['comment_id']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get()
def get_post(self, data=None):
self.handle(self._get_post, data)
return self.info, self.status
def _get_post(self, data=None):
if dict_key_verify(data, 'username'):
if self.authorised('management', data['username'], "or"):
self.obj.username = data['username']
if dict_key_verify(data, 'post_id'):
self.obj.post_id = data['post_id']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get_post()
def get_permissions(self, data=None):
self.handle(self._get_permissions, data)
return self.info, self.status
def _get_permissions(self, data=None):
if dict_key_verify(data, 'username'):
if self.authorised('management', lead_username=data['username'], mode="or"):
self.obj.username = data['username']
if dict_key_verify(data, 'comment_id'):
self.obj.comment_id = data['comment_id']
self.info = self.obj.get_permissions()
def set(self, data=None):
self.handle(self._set, data)
return self.info, self.status
def _set(self, data=None):
if dict_key_verify(data, 'username') and self.admin:
self.obj.username = data['username']
if dict_key_verify(data, 'post_id'):
self.obj.post_id = data['post_id']
self.obj.columns = list(data.keys())
self.obj.set(data)
def delete(self, data=None):
self.handle(self._delete, data)
return self.info, self.status
def _delete(self, data=None):
if dict_key_verify(data, 'username'):
if self.authorised('management', lead_username=data['username'], mode="or"):
self.obj.username = data['username']
if dict_key_verify(data, 'comment_id'):
commenter_info = content_info.comment(comment_id=data['comment_id'], items=['username']).get()
if dict_key_verify(commenter_info, 'comments'):
commenter_info = commenter_info['comments']
if dict_key_verify(commenter_info, 'username'):
commenter_username = commenter_info['username']
if self.authorised('management', lead_username = commenter_username, mode="or"):
self.obj.comment_id = data['comment_id']
self.obj.delete()
class impression_handler(root_handler):
def get(self, data=None):
self.handle(self._get, data)
return self.info, self.status
def _get(self, data=None):
if dict_key_verify(data, 'impression_id'):
self.obj.impression_id = data['impression_id']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
self.info = self.obj.get()
def get_comment(self, data=None):
self.handle(self._get_content, data)
return self.info, self.status
def get_post(self, data=None):
self.handle(self._get_content, data)
return self.info, self.status
def _get_content(self, data=None):
if dict_key_verify(data, 'username'):
if authorised(level='management', lead_username=data['username'], mode='or'):
self.obj.username = data['username']
if dict_key_verify(data, 'post_id'):
self.obj.post_id = data['post_id']
if dict_key_verify(data, 'comment_id'):
self.obj.comment_id = data['comment_id']
if dict_key_verify(data, 'items'):
self.obj.columns = data['items']
if dict_key_verify(data, 'types'):
self.obj.types = data['types']
self.info = self.obj.get_content()
def count(self, data=None):
self.handle(self._count, data)
return self.info, self.status
def _count(self, data=None):
if dict_key_verify(data, 'username'):
if authorised(level='management', lead_username=data['username'], mode='or'):
self.obj.username = data['username']
if dict_key_verify(data, 'impression_type'):
self.obj.impression_type = data['impression_type']
if dict_key_verify(data, 'post_id'):
self.obj.post_id= data['post_id']
if dict_key_verify(data, 'comment_id'):
self.obj.comment_id = data['comment_id']
self.info = self.obj.count(data)
def set(self, data=None):
self.handle(self._set, data)
return self.info, self.status
def _set(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
if dict_key_verify(data, 'impression_type'):
self.obj.impression_type = data['impression_type']
if dict_key_verify(data, 'post_id'):
self.obj.post_id= data['post_id']
if dict_key_verify(data, 'comment_id'):
self.obj.comment_id = data['comment_id']
self.info = self.obj.set(data)
def delete(self, data=None):
self.handle(self._delete, data)
return self.info, self.status
def _delete(self, data=None):
if dict_key_verify(data, 'impression_id'):
self.obj.impression_id = data['impression_id']
username = self.obj.get()['username']
if self.is_leader(self.obj.id, username) or self.admin or self.management:
self.info = self.obj.delete(data)
class post_impression_handler(impression_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='comment impression event', *args, **kwargs):
super().__init__(sio, sid, session, min_level=min_level, event_name='event', *args, **kwargs)
self.obj = content_info.post_impression(user_id=self.user_id)
class comment_impression_handler(impression_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='comment impression event', *args, **kwargs):
super().__init__(sio, sid, session, min_level=min_level, event_name='event', *args, **kwargs)
self.obj = content_info.comment_impression(user_id=self.user_id)
class notification_handler(root_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='notification event', *args, **kwargs):
super().__init__(sio, sid, session, min_level=min_level, event_name=event_name, *args, **kwargs)
self.obj = content_info.notification(user_id=self.user_id)
def get(self, data=None):
self.handle(self._get, data)
return self.info, self.status
def _get(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
self.info = self.obj.get(data)
if self.info:
status("INFO", "Notification(s) successfully fetched", self.statface)
else:
status("FAIL", "Notification(s) unable to be fetched, something went wrong", self.statface)
def create(self, data=None):
self.handle(self._create, data)
return self.status
def _create(self, data=None):
allowed = False
if dict_key_verify(data, 'target_id'):
target_data = {'target_id': data['target_id']}
target_info = self.obj.get_target_group(target_data)
if target_info['type'] == "team" or "user":
if self.is_leader(self.user_id, target_info['id']) or self.management:
allowed = True
if allowed:
self.obj.create(data)
else:
status("FAIL", "Unable to create notification, you are unauthorised for this action", self.statface)
def delete(self, data=None):
self.handle(self._delete, data)
return self.status
def _delete(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
self.obj.delete(data)
def remove(self, data=None):
self.handle(self._remove, data)
return self.status
def _remove(self, data=None):
if dict_key_verify(data, 'username') and self.management:
self.obj.username = data['username']
if dict_key_verify(data, 'notification_id'):
self.obj.notification_id = data['notification_id']
self.obj.remove(data)
class post_slot_handler(root_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='post slot event', *args, **kwargs):
super().__init__(sio, sid, session, min_level=min_level, event_name=event_name, *args, **kwargs)
self.obj = timestamp()
def get(self, data=None):
self.info = {'post_slot_start': None, 'post_slot_end': None, 'date': None}
slot_data = self.obj.get_slot()
if slot_data:
self.info['post_slot_start'] = slot_data['start']
self.info['post_slot_end'] = slot_data['end']
self.info['date'] = self.obj.date
status("INFO", "Successfully fetched post slot", self.statface)
else:
status("CRIT", "Unable to fetch post slot, something went wrong, please contact administrator", self.statface)
return self.info, self.status
class server(root_handler):
def __init__(self, sio, sid, session, min_level='admin', event_name='post slot event', *args, **kwargs):
if sid:
super().__init__(sio, sid, session, min_level=min_level, event_name=event_name, *args, **kwargs)
self.session = session
self.clients = self.session.clients
self.logged_in = self.session.logged_in
try:
# to handle for the case where the database hasnt been decrypted but an internal shutdown has been called
self.obj = content_info.notification()
except:
pass
def _estimate_shutdown_time(self):
estimate = (len(self.clients) - len(self.logged_in)) + len(self.logged_in)*2 + 10
return estimate
def _notify_clients(self, shutdown_by):
server_code = config_read("miscellaneous", "ServerCode")
notif_data = {'target_id': "all-"+server_code, 'expire_after':shutdown_by-timestamp().now-1, 'title': "Server Shutting Down", 'content': "Its recomended to disconnect"}
self.obj.create(notif_data)
status("INFO", "Shutdown notifications sent", self.statface)
def _shutdown_services(self):
# the below is in the documentation but doesnt work
#self.sio.shutdown()
status("INFO", "Background user and server services shutdown", self.statface)
def _close_new_clients(self):
self.session.accepting_clients = False
status("INFO", "Server closed to new clients", self.statface)
def _disconnect_clients(self):
status("INFO", "Disconnecting clients, bye", self.statface)
for client in self.clients:
self.sio.disconnect(client)
def shutdown(self, data=None):
self.handle(self._shutdown, data)
return self.info, self.status
def _shutdown(self, data=None):
log("WARN", "Server recieved shutdown signal")
tasks = {'notifs': False, 'disconnect_clients': False ,'background_services': False, 'close_new_clients': False}
if dict_key_verify(data, "time"):
shutdown_by = timestamp().now + float(data['time'])
else:
shutdown_by = timestamp().now + 30
status("WARN", "No shutdown time was provided, shutting down in 30 seconds", self.statface)
estimate = self._estimate_shutdown_time()
sent_notifs = False
while timestamp().now < shutdown_by - estimate:
if not tasks['notifs']:
self._notify_clients(shutdown_by)
tasks['notifs'] = True
if timestamp().now <= shutdown_by - estimate*1.2 and tasks['background_services'] == True:
self._shutdown_services()
tasks['background_services'] = True
self._close_new_clients()
tasks['close_new_clients'] = True
eventlet.sleep(1)
status("WARN", "Beginning final shutdown process, disconnect client", self.statface)
if not tasks['background_services']:
self._shutdown_services()
if not tasks['close_new_clients']:
self._close_new_clients()
self._disconnect_clients()
log("INFO", "Server finished pre-shutdown process calling stop")
if config_read("database", "encrypt"):
if not self.session.db_encrypted:
db_encrpytion(self.session).encrypt()
else:
log("WARN", "Not encrypting database, database was never decrypted")
#self.sio.shutdown()
def internal_shutdown(self, data):
self.statface = None
self._shutdown(data)
class encryption_handler():
def __init__(self, session):
self.obj = db_encrpytion(session)
self.session = session
self.statface = None
def decrypt(self, data=None):
if data:
success = self._decrypt(data)
return success
else:
log("FAIL", "Could not decrypt database")
return False
def _decrypt(self, data):
sss_enabled = config_read("database", "ShamirSecretSharing")
flags = []
if sss_enabled and dict_key_verify(data, "shares") and not dict_key_verify(data, "password"):
flags = ["sss"]
if dict_key_verify(data, "password") or ("sss" in flags):
try:
if dict_key_verify(data, "password"):
password = int(data['password'])
success = self.obj.decrypt(data, flags)
if success:
return True
else:
log("FAIL", "Something went wrong while decrypting the database")
except:
log("FAIL", "Something went wrong with decrypting the database")
return False
# TESTING RBP
def test():
pass
# TESTING RBP
if __name__ == "__main__":
test()