# BEFORE PRODUCTION PUSH ### Need to uncomment the try and exept build into fuctions: #### login, register, admin_register import sqlite3 import time from string import ascii_letters, ascii_lowercase, digits ### MODULES from modules.track import * from modules.user.generate import main as user_generate from modules.user import info as user_info from modules.data.database import connect as db_connect from modules.data.config import read as config_read from modules.data.datetime import timestamp from modules.algorithms.uuid import long_hash as hash_string from modules.algorithms.uuid import generate as uuid_generate from modules.algorithms.univ import char_check ### MODULES # need to change this to path database_name = config_read("database", "Path") class reg_cred(): def __init__(self, cred): self.level = config_read("user", "DefaultLevel") self.key = cred['key'] self.username = cred['username'] self.password= cred['password'] self.repassword= cred['repassword'] self.db = db_connect() self.db.create(self) logging.status("INFO", "registration initialised").status_update(self) def exec(self): # CHECKS check_processes = [self.username_verify, self.username_bans, self.username_clash_check, self.password_verify] for check in check_processes: check() if self.status['level'] == "FAIL": return if not self.key_verify(): return logging.status("INFO", "credential verification successful").status_update(self) # CHECKS self.id = user_generate(self.username, self.password, self.level) #self.db.close() logging.status("INFO", "registration successful").status_update(self) def username_verify(self): # This will be configurable min_len = 3 max_len = 25 if self.username == None: logging.status("FAIL", "username cannot be null").status_update(self) elif len(self.username) < min_len or len(self.username) > max_len: logging.status("FAIL", f"username cant be shorter than {min_len} characters or longer than {max_len} characters").status_update(self) elif char_check(self.username, ascii_letters + digits + "_" + "-") == True: logging.status("FAIL", f"username contains invalid characters").status_update(self) def username_bans(self): servercode = config_read('miscellaneous', 'servercode') if servercode in self.username: logging.status("FAIL", "usernames contains servercode").status_update(self) def username_clash_check(self): self.cur.execute("SELECT username FROM auth_credentials WHERE username = ?", (self.username,)) if self.cur.fetchall(): logging.status("FAIL", "username is already in use").status_update(self) def password_verify(self): # This will be configurable min_len = 4 max_len = 100 if self.password == None: logging.status("FAIL", "password cannot be null").status_update(self) elif len(self.password) < min_len or len(self.password) > max_len: logging.status("FAIL", f"password cant be shorter than {min_len} characters or longer than {max_len}").status_update(self) elif self.password != self.repassword: logging.status("FAIL", f"passwords do not match").status_update(self) def key_verify(self): if self.key == config_read('authorisation', 'RegistrationKey'): return True else: return False logging.status("FAIL", "registration code is incorrect").status_update(self) class reg_admin(reg_cred): def __init__(self, cred): super().__init__(cred) self.level = "admin" logging.status("INFO", "admin registration initialised").status_update(self) def key_verify(self): if self.key == config_read('authorisation', 'AdminKey'): return True else: return False def first_time(self): self.cur.execute("SELECT user_id FROM auth_credentials WHERE level = ?", (self.level,)) value = self.cur.fetchone() if value: return False else: return True class login_cred(): def __init__(self, sio, sid, cred): self.username = cred['username'] self.password = cred['password'] self.sio = sio self.sid = sid self.db = db_connect() self.db.create(self) logging.status("INFO", "credential login initialised").status_update(self) def exec(self): self.process_password() self.cur.execute("SELECT user_id FROM auth_credentials WHERE username = ? AND password = ?", (self.username, self.password_hash)) self.id = self.cur.fetchone() if self.id: self.id = self.id[0] logging.status("INFO", "valid login credentials").status_update(self) login_token.create_token(self) login_token.send_token(self) logging.status("INFO", "login successful").status_update(self) else: logging.status("FAIL", "invalid login credentials").status_update(self) self.db.close() def process_password(self): self.cur.execute("SELECT user_id FROM auth_credentials WHERE username = ?", (self.username,)) user_id = self.cur.fetchone() if user_id: self.password_hash = hash_string(self.password + user_id[0]) else: self.password_hash = None class login_token(): def __init__(self, cred): self.token = cred['token'] self.db = db_connect() self.db.create(self) logging.status("INFO", "token login initialised").status_update(self) def exec(self): self.token_hash = hash_string(self.token) self.cur.execute("SELECT user_id, token_expire FROM auth_tokens WHERE token = ?", (self.token_hash,)) fetch_data = self.cur.fetchall() self.id = None if fetch_data: self.id, self.token_expire = fetch_data[0][0], fetch_data[0][1] if self.token_expire > timestamp().now: logging.status("INFO", "valid token").status_update(self) else: logging.status("FAIL", "invalid token").status_update(self) else: logging.status("FAIL", "invalid token").status_update(self) self.db.close() @staticmethod def create_token(self): expire_time = float(config_read("authorisation", "tokenexpirytime")) self.token = uuid_generate() self.token_hash = hash_string(self.token) self.token_expire = timestamp().now + expire_time ### ALL NEEDS CHANGING self.cur.execute("INSERT INTO auth_tokens (user_id, token, token_expire) VALUES (?, ?, ?)", (self.id, self.token_hash, self.token_expire)) self.db.commit() logging.status("INFO", "authentication token created").status_update(self) @staticmethod def send_token(self): self.sio.emit('recv_token', {'token':self.token, 'expire': self.token_expire}, room=self.sid) logging.status("INFO", "token sent").status_update(self) class error_process(): def __init__(self): logging.status("WARNING", "something went wrong").status_update(self) self.id = None def login(sio, sid, cred): if "token" in cred: try: client = login_token(cred) client.exec() except: logging.status("FAIL", "token not authorised").status_update(client) elif all(param in cred for param in ['username', 'password']): try: client = login_cred(sio, sid, cred) client.exec() except: logging.status("FAIL", "login failed").status_update(client) else: client = error_process() logging.status("FAIL", "no credentials provided").status_update(client) client.level = user_info.level(user_id=client.id).get() if client.level: client.level = client.level['level'] return client.status, client.id, client.level def register(cred): if all(param in cred for param in ['username', 'password', 'repassword', 'key']): try: client = reg_cred(cred) client.exec() except: logging.status("FAIL", "registration failed").status_update(client) else: client = generic_process() logging.status("FAIL", "no credentials provided").status_update(client) return client.status def admin_register(cred): if all(param in cred for param in ['username', 'password', 'repassword', 'key']): try: client = reg_admin(cred) if client.key_verify() == True and client.first_time() == True: client.exec() else: logging.status("FAIL", "admin key does not match/admin already exists").status_update(client) except: logging.status("FAIL", "registration failed").status_update(client) else: client = error_process() logging.status("FAIL", "no credentials provided").status_update(client) return client.status def authorised(sio, sid, min_level='admin'): level_list = ['member', 'management', 'admin'] allow_levels = level_list[level_list.index(min_level):] level = sio.get_session(sid)['level'] if level in allow_levels: user_authorised = True else: user_authorised = False return user_authorised def main(): error = error_process() if __name__ == "__main__": main()