from modules.track.logging import log, status from modules.data.config import read as config_read from modules.data.database import retrieve from modules.data.database import connect as db_connect from modules.data.datetime import timestamp from modules.algorithms.uuid import generate as uuid_generate from modules.algorithms.univ import dict_key_verify from modules.algorithms.recomend import recomend_friend class table(): def __init__(self, user_id=None, username=None, occupation_id=None, allowed_columns=None, *args, **kwargs): self.statface = None self.db = db_connect() self.db.create(self) self.id = user_id self.username = username self.occupation_id = occupation_id if not self.allowed_columns: self.allowed_columns = allowed_columns self.columns = self.allowed_columns self.server_code = config_read('miscellaneous', 'servercode') @property def id(self): return self._id @id.setter def id(self, value): if type(value) == str: self.cur.execute("SELECT username FROM auth_credentials WHERE user_id = ?", (value,)) if not self.cur.fetchone(): value = None else: value = None self._id = value @property def username(self): return self._username @username.setter def username(self, value): self.cur.execute("SELECT user_id FROM auth_credentials WHERE username = ?", (value,)) if not self.cur.fetchone(): value = None self._username = value if value: u_id = user_id(username=value).get()['user_id'] if self.id != u_id: self.id = u_id @property def occupation_id(self): return self._occupation_id @occupation_id.setter def occupation_id(self, value): self.cur.execute("SELECT name FROM occupations WHERE occupation_id = ?", (value,)) if not self.cur.fetchone(): value = None self._occupation_id = value @property def team_id(self): return self._team_id @team_id.setter def team_id(self, value): self.cur.execute("SELECT name FROM teams WHERE team_id = ?", (value,)) if not self.cur.fetchone(): value = None self._team_id = value @property def columns(self): return self._columns @columns.setter def columns(self, value): valid = [] if type(value) == list: for column in value: if column in self.allowed_columns: valid.append(column) self._columns = valid @property def date(self): self._date = timestamp().date return self._date @date.setter def date(self, value): self._date = value class user_id(): def __init__(self, username=None, *args, **kwargs): self.username = username self.db = db_connect() self.db.create(self) def get(self): info = {'user_id':None} self.cur.execute(f"SELECT user_id FROM auth_credentials WHERE username = ?", (self.username,)) rez = self.cur.fetchone() if rez: info = {"user_id":rez[0]} else: info = None return info class auth(table): def __init__(self, user_id=None, username=None, *args, **kwargs): self.allowed_columns = ["username", "level"] super().__init__(user_id=user_id, username=username) def get(self): info = {} for column in self.columns: info[column] = None self.cur.execute(f"SELECT {column} FROM auth_credentials WHERE user_id = ?", (self.id,)) rez = self.cur.fetchone() if rez: info[column] = rez[0] if not self.id: status("FAIL", "Invalid username provided", self.statface) info = None else: status("INFO", "Authorisation info successfully fetched", self.statface) return info def set(self, data): for column in self.columns: value = data[column] rez = None if column == 'username': # the select statement was here instead of update i have no idea why # ive replaced it with an update since updating a username is completetly fine #self.cur.execute("SELECT username FROM auth_credentials WHERE username = ?", (value,)) self.cur.execute("UPDATE auth_credentials SET username = ? WHERE user_id = ?", (value, self.id)) status("INFO", "Successfully changed username", self.statface) if column == 'level': self.cur.execute("UPDATE auth_credentials SET level = ? WHERE user_id = ?", (value ,self.id)) status("INFO", "Successfully changed level", self.statface) self.db.commit() # V RBP: I think this is depricated and no longer in use class level(auth): def __init__(self, user_id): super().__init__(user_id=user_id) self.columns = ["level"] # ^ RBP: I think this is depricated and no longer in use class team(table): def __init__(self, user_id=None, username=None, occupation_id=None, team_id=None, *args, **kwargs): self.allowed_columns = ['team_id', 'name', 'occupation_id', 'user_id'] super().__init__() if user_id: self.id = user_id if username: self.username = username if occupation_id: self.occupation_id = occupation_id if team_id: self.team_id = team_id @property def id(self): return self._id @id.setter def id(self, value): occupation_value = None self.cur.execute("SELECT username FROM auth_credentials WHERE user_id = ?", (value,)) if not self.cur.fetchone(): value = None else: self.cur.execute("SELECT occupation_id FROM profile WHERE user_id = ?", (value,)) rez = self.cur.fetchone() if rez: occupation_value = rez[0] self.occupation_id = occupation_value self._id = value @property def occupation_id(self): return self._occupation_id @occupation_id.setter def occupation_id(self, value): team_value = None self.cur.execute("SELECT name FROM occupations WHERE occupation_id = ?", (value,)) if not self.cur.fetchone(): value = None else: self.cur.execute("SELECT team_id FROM teams WHERE occupation_id = ?", (value,)) rez = self.cur.fetchone() if rez: team_value = rez[0] self.team_id = team_value self._occupation_id = value def get(self): info = {column: None for column in self.columns} for column in self.columns: self.cur.execute(f"SELECT {column} FROM teams WHERE team_id = ?", (self.team_id,)) rez = self.cur.fetchone() if rez: info[column] = rez[0] if not all(info.values()) and not(self.team_id): info = None status("FAIL", "Team data could not be fetched, invalid data provided", self.statface) else: status("INFO", "Team data successfully fetched", self.statface) return info def get_all(self): info = {'teams': None} for column in self.columns: self.cur.execute(f"SELECT {column} FROM teams WHERE user_id IS NULL") rez = self.cur.fetchall() if rez: if not info['teams']: info['teams'] = [{} for i in range(len(rez))] for i, items in enumerate(rez): info['teams'][i][column] = items[0] status("INFO", "Team(s) successfully fetched", self.statface) else: status("FAIL", "Team(s) could not be fetched, something went wrong", self.statface) return info def get_members(self): info = {'members': None} self.cur.execute("""SELECT auth_credentials.username FROM auth_credentials INNER JOIN profile USING(user_id) CROSS JOIN teams ON profile.occupation_id = teams.occupation_id WHERE teams.team_id=?""", (self.team_id,)) rez = self.cur.fetchall() if rez: info['members'] = [{'username': member[0]} for member in rez] status("INFO", "Team members successfully fetched", self.statface) if not self.team_id: status("FAIL", "Team members could not be fetched, invalid data provided") info = None return info def get_leaders(self): info = {'leaders': None} self.cur.execute("SELECT user_id FROM team_leaders WHERE team_id = ?", (self.team_id,)) rez = self.cur.fetchall() if rez: info['leaders'] = [{'username':(auth(user_id=user_id).get())['username'] for user_id in leader} for leader in rez] status("INFO", "Team leaders successfully fetched", self.statface) else: status("FAIL", "Team leaders could not be fetched, invalid data provided", self.statface) return info def set(self, data): for column in self.columns: if column == "name" and dict_key_verify(data, 'name'): self.cur.execute("UPDATE teams SET name=? where team_id=?", (data['name'] ,self.team_id)) self.db.commit() status("INFO", "Team data successfully changed", self.statface) if dict_key_verify(data, 'leaders'): current_leaders = (self.get_leaders())['leaders'] for leader in data['leaders']: exists = False if current_leaders: for current_leader in current_leaders: if current_leader['username'] == leader['username']: exists = True if not exists: self.cur.execute("SELECT user_id FROM auth_credentials WHERE username = ?", (leader['username'],)) info = user_id(username=leader['username']).get() if info: self.cur.execute("INSERT INTO team_leaders (user_id, team_id) VALUES (?, ?)", (info['user_id'], self.team_id)) self.db.commit() status("INFO", "New leader successfully added to team", self.statface) else: status("FAIL", "Leader not set, user does not exist", self.statface) else: status("WARN", "This user already exists as a leader of the team", self.statface) def delete_leaders(self, data): leaders = data['leaders'] current_leaders = self.get_leaders()['leaders'] if type(leaders) == str: leaders = [leaders] for leader in leaders: exists = False if current_leaders: for current_leader in current_leaders: if current_leader['username'] == leader['username']: exists = True if exists: self.cur.execute("DELETE FROM team_leaders WHERE user_id=? AND team_id=?", (user_id(username=leader['username']).get()['user_id'],self.team_id,)) self.db.commit() status("INFO", "User {leader['username']} removed as a leader from this team", self.statface) else: status("WARN", "User {leader['username']} does not exist as a leader of this team", self.statface) class friend(table): @property def friend_username(self): return self._friend_username @friend_username.setter def friend_username(self, value): obj = friend(username=value) if not obj.username: value = None else: self.friend_id = user_id(username=value).get()['user_id'] self._friend_username = value @property def friend_id(self): return self._friend_id @friend_id.setter def friend_id(self, value): obj = friend(user_id=value) if not obj.id: value = None self._friend_id = value @property def mode(self): return self._mode @mode.setter def mode(self, value): if value not in ['incoming', 'outgoing']: value = "incoming" self._mode = value def __init__(self, user_id=None, username=None, *args, **kwargs): self.allowed_columns = ['username', 'friend_username'] self.mode = "outgoing" super().__init__(user_id=user_id, username=username) def get(self): info = {'friends':None} self.cur.execute("SELECT friend_id FROM friends WHERE user_id = ? AND approved = ?", (self.id, True)) rez = self.cur.fetchall() info['friends'] = [auth(user_id=user[0]).get() for user in rez] if not self.id: info = None status("FAIL", "Friends not fetched, invalid data provided", self.statface) else: status("INFO", "Friends successfully fetched", self.statface) return info def get_requests(self): info = {'requests': None} if self.mode == 'incoming': self.cur.execute("SELECT user_id FROM friends WHERE friend_id = ? AND approved = ?", (self.id, False)) else: self.cur.execute("SELECT friend_id FROM friends WHERE user_id = ? AND approved = ?", (self.id, False)) rez = self.cur.fetchall() if rez: users = [auth(user_id=user[0]).get()['username'] for user in rez] info['requests'] = users status("INFO", f"Successfully fetched {self.mode} friend request(s)", self.statface) elif not self.id: status("FAIL", f"Could not fetch {self.mode} friend request(s), invalid data provided", self.statface) info = None return info def get_recomendations(self, data): info = {'recomended': None} if dict_key_verify(data, 'amount') and isinstance(data['amount'], int): amount = data['amount'] else: status("FAIL", "Could not fetch friend recomendation(s), invalid amount provided or data is in wrong format", self.statface) return None depth = 3 username = auth(user_id=self.id).get()['username'] recomendations = recomend_friend(username, amount, depth) if recomendations: info['recomended'] = recomendations status("INFO", "Successfully fetched friend recomendations", self.statface) else: status("INFO", "No recomendations could be generated, add some friends first", self.statface) return info def add_request(self, data): approved = False if dict_key_verify(data, 'friend_username'): self.friend_username = data['friend_username'] friend_id = user_id(data['friend_username']).get()['user_id'] if friend_id: # checks if the other person has added them as a friend # if so it accepts the other persons request and creates their own approved request self.cur.execute("SELECT user_id FROM friends WHERE friend_id = ? AND user_id = ?", (self.id, friend_id)) rez = self.cur.fetchone() if rez: self.cur.execute("UPDATE friends SET approved = True WHERE friend_id = ? AND user_id = ?", (self.id, friend_id)) approved = True # checks to see if this friend request already exists (wether accepted or rejected) # if not then it makes a new unaproved friend request self.cur.execute("SELECT approved FROM friends WHERE user_id = ? AND friend_id = ?", (self.id, friend_id)) rez = self.cur.fetchone() if not rez: self.cur.execute("INSERT INTO friends (user_id, friend_id, approved) VALUES (?, ?, ?)", (self.id, friend_id, approved)) status("INFO", "Friend request successfully created", self.statface) elif rez[0] == False: status("WARN", "User already has an active friend request to this user", self.statface) elif rez[0] == True: status("WARN", "User is already friends with other user", self.statface) else: status("FAIL", "Could not create friend request, invalid data provided") self.db.commit() def approve_request(self, data): if dict_key_verify(data, 'friend_username'): self.friend_username = data['friend_username'] self.cur.execute("SELECT approved FROM friends WHERE friend_id = ? AND user_id = ?", (self.id, self.friend_id)) rez = self.cur.fetchone() if rez: self.add_request(data) else: status("FAIL", "Friend request does not exist", self.statface) def reject_request(self, data): self.remove(data) def delete_request(self, data): self.remove(data) def remove(self, data): if dict_key_verify(data, 'friend_username'): self.friend_username = data['friend_username'] if self.friend_id: self.cur.execute("DELETE FROM friends WHERE user_id = ? AND friend_id = ?", (self.id, self.friend_id)) self.cur.execute("DELETE FROM friends WHERE friend_id = ? AND user_id = ?", (self.id, self.friend_id)) status("INFO", "Friend/friend request successfully removed/rejected", self.statface) else: status("FAIL", "Friend/friend request could not be removed/rejected, invalid data provided", self.statface) self.db.commit() class profile(table): @property def target_username(self): return self._target_username @target_username.setter def target_username(self, value): prof = profile(username=value) if not prof.username: value = None self._target_username = value def __init__(self, user_id=None, username=None, *args, **kwargs): self.allowed_columns = ["biography", "role", "name", "occupation_id"] super().__init__(user_id=user_id, username=username) def get(self): info = {} for column in self.columns: info[column] = None self.cur.execute(f"SELECT {column} FROM profile WHERE user_id = ?", (self.id,)) rez = self.cur.fetchone() if rez: info[column] = rez[0] if not self.id: status("FAIL", "Invalid username provided profile unable to be fetched") info = None else: status("INFO", "Profile infomation successfully fetched") return info def get_permissions(self): info = {"delete": False, "edit": False} subject = auth(user_id=self.id, items=['level', 'username']).get() if subject['level'] == "management" or subject['level'] == "admin": info['delete'] = True info['edit'] = True if self.target_username: if subject['username'] == self.target_username: info['delete'] = True info['edit'] = True target_team_info = team(username=self.target_username).get_leaders() if dict_key_verify(target_team_info, 'leaders'): target_leaders = target_team_info['leaders'] if subject['username'] in target_leaders: info['delete'] = True if not self.id or not self.target_username: status("FAIL", "Invalid username or data provided", self.statface) info = None else: status("INFO", "Permissions successfully fetched", self.statface) return info def set(self, data): for column in self.columns: item = data[column] self.cur.execute(f"UPDATE profile SET {column} = ? WHERE user_id = ?", (item, self.id)) status("INFO", "Successfully changed/deleted {column}", self.statface) self.db.commit() def delete(self): data = {} for column in self.columns: data[column] = None self.set(data) class occupation(table): def __init__(self, user_id=None, username=None, occupation_id=None, *args, **kwargs): self.allowed_columns = ["occupation_id", "name", "description"] super().__init__(user_id=user_id, username=username, occupation_id=occupation_id) def get(self): info = {column: None for column in self.columns} if not self.occupation_id: self.cur.execute("SELECT occupations.occupation_id, occupations.name, description FROM profile INNER JOIN occupations USING(occupation_id) WHERE user_id = ?", (self.id,)) else: self.cur.execute("SELECT occupation_id, name, description FROM occupations WHERE occupation_id = ?", (self.occupation_id,)) rez = self.cur.fetchone() if rez: occupation = {'occupation_id':rez[0], 'name':rez[1], 'description':rez[2]} for column in self.columns: info[column] = occupation[column] if not rez and not self.id: status("FAIL", "Occupation could not be fetched: invalid data provided", self.statface) info = None else: status("INFO", "Occupation successfully fetched", self.statface) return info def get_request(self): info = {'occupation_id': None, 'approved': None} self.cur.execute("SELECT occupation_id, approved FROM occupation_requests WHERE user_id = ?", (self.id,)) rez = self.cur.fetchone() if rez: info['occupation_id'] = rez[0] info['approved'] = rez[1] status("INFO", "Occupation requests fetched successfully") else: status("FAIL", "Occupation requests could not be fetched invalid data provided", self.statface) info = None return info def get_all_requests(self): info = {'requests': None} self.cur.execute("SELECT user_id, occupation_id FROM occupation_requests WHERE approved = ?", (False,)) rez = self.cur.fetchall() if rez: info['requests'] = [{'username': auth(user_id=request[0]).get()['username'], 'occupation_id': request[1]} for request in rez] status("INFO", "Occupation requests successfully fetched", self.statface) else: status("FAIL", "Occupation requests could not be fetched something went wrong", self.statface) return info def set(self, data): occupation_id = data["occupation_id"] self.cur.execute("SELECT name FROM occupations WHERE occupation_id = ?", (occupation_id,)) if self.cur.fetchone(): self.cur.execute("UPDATE profile SET occupation_id = ? WHERE user_id = ?", (occupation_id, self.id)) status("INFO", "Occupation successfully updated", self.statface) else: status("FAIL", "Occupation could not be updated invalid data provided", self.statface) self.db.commit() def set_request(self, data): occupation_id = data['occupation_id'] self.cur.execute("SELECT approved FROM occupation_requests WHERE user_id = ?", (self.id,)) if self.cur.fetchone(): self.delete_request() status("INFO", "Removing previous occupation change request", self.statface) self.cur.execute("SELECT name FROM occupations WHERE occupation_id = ?", (occupation_id,)) if self.cur.fetchone(): self.cur.execute("INSERT INTO occupation_requests (user_id, occupation_id, approved) VALUES (?, ?, ?)", (self.id, occupation_id, False)) else: status("FAIL", "Occupation change request could not be made invalid occupation_id provided", self.statface) self.db.commit() def approve_request(self): self.cur.execute("SELECT occupation_id FROM occupation_requests WHERE approved = ? AND user_id = ?", (False, self.id,)) if self.cur.fetchone(): self.cur.execute("UPDATE occupation_requests SET approved = ? WHERE user_id = ?", (True, self.id)) self.cur.execute("SELECT occupation_id FROM occupation_requests WHERE user_id = ?", (self.id,)) rez = self.cur.fetchone() if rez: self.set({'occupation_id': rez[0]}) status("INFO", "Occupation change request successfully approved", self.statface) else: status("CRIT", "Occupation change request approved but not changed in the user entry, contact admin", self.statface) else: status("FAIL", "Occupation change request from that user does not exist or has already been approved", self.statface) self.db.commit() def reject_request(self): self.delete_request() def delete(self): self.cur.execute("UPDATE profile SET occupation_id = ? WHERE user_id = ?", (None, self.id)) self.db.commit() status("INFO", "Occupation no longer associated with user", self.statface) def delete_request(self): self.cur.execute("DELETE FROM occupation_requests WHERE user_id = ?", (self.id,)) self.db.commit() status("INFO", "Occupation change request successfully deleted", self.statface) def get_all(self): info = {'occupations':None} self.cur.execute("SELECT occupation_id, name, description FROM occupations") rez = self.cur.fetchall() if rez: occupations = [{'occupation_id':occupation[0], 'name':occupation[1], 'description': occupation[2]} for occupation in rez] info['occupations'] = occupations status("INFO", "Occupation(s) successfully fetched", self.statface) else: status("FAIL", "Occupation(s) could not be fetched", self.statface) return info def create(self, data={'name': None, 'description': None}): occupation_uuid = uuid_generate() team_uuid = uuid_generate() name = data['name'] description = data['description'] self.cur.execute("INSERT INTO occupations(occupation_id, name, description) VALUES (?, ?, ?)", (occupation_uuid, name, description)) self.cur.execute("INSERT INTO teams (team_id, name, occupation_id) VALUES (?, ?, ?)", (team_uuid, name, occupation_uuid)) self.db.commit() def edit(self, data): if 'occupation_id' in data and not self.occupation_id: self.occupation_id = data['occupation_id'] for column in self.columns: if column == "occupation_id": continue value = data[column] self.cur.execute(f"UPDATE occupations SET {column} = ? WHERE occupation_id = ?", (value, self.occupation_id)) self.db.commit() def delete_occupation(self, data=None): if dict_key_verify(data, "occupation_id") and not self.occupation_id: self.occupation_id = data['occupation_id'] self.cur.execute("DELETE FROM occupations WHERE occupation_id = ?", (self.occupation_id,)) self.db.commit() def main(): log("WARN", "modules/user/info.py has been called as main. This file is not intended to run solo. Please use main.py or modules/handler/handler.py") if __name__ == "__main__": main()