This repository has been archived on 2025-02-10. You can view files and clone it, but cannot push or open issues or pull requests.
Files
beopen/server/modules/auth/auth.py
2025-02-10 12:37:33 +00:00

295 lines
9.5 KiB
Python

# BEFORE PRODUCTION PUSH
### Need to uncomment the try and exept build into fuctions:
#### login, register, admin_register
import sqlite3
import time
from string import ascii_letters, ascii_lowercase, digits
### MODULES
from modules.track import *
from modules.user.generate import main as user_generate
from modules.user import info as user_info
from modules.data.database import connect as db_connect
from modules.data.config import read as config_read
from modules.data.datetime import timestamp
from modules.algorithms.uuid import long_hash as hash_string
from modules.algorithms.uuid import generate as uuid_generate
from modules.algorithms.univ import char_check
### MODULES
# need to change this to path
database_name = config_read("database", "Path")
class reg_cred():
def __init__(self, cred):
self.level = config_read("user", "DefaultLevel")
self.key = cred['key']
self.username = cred['username']
self.password= cred['password']
self.repassword= cred['repassword']
self.db = db_connect()
self.db.create(self)
logging.status("INFO", "registration initialised").status_update(self)
def exec(self):
# CHECKS
check_processes = [self.username_verify, self.username_bans, self.username_clash_check, self.password_verify]
for check in check_processes:
check()
if self.status['level'] == "FAIL":
return
if not self.key_verify():
return
logging.status("INFO", "credential verification successful").status_update(self)
# CHECKS
self.id = user_generate(self.username, self.password, self.level)
#self.db.close()
logging.status("INFO", "registration successful").status_update(self)
def username_verify(self):
# This will be configurable
min_len = 3
max_len = 25
if self.username == None:
logging.status("FAIL", "username cannot be null").status_update(self)
elif len(self.username) < min_len or len(self.username) > max_len:
logging.status("FAIL", f"username cant be shorter than {min_len} characters or longer than {max_len} characters").status_update(self)
elif char_check(self.username, ascii_letters + digits + "_" + "-") == True:
logging.status("FAIL", f"username contains invalid characters").status_update(self)
def username_bans(self):
servercode = config_read('miscellaneous', 'servercode')
if servercode in self.username:
logging.status("FAIL", "usernames contains servercode").status_update(self)
def username_clash_check(self):
self.cur.execute("SELECT username FROM auth_credentials WHERE username = ?", (self.username,))
if self.cur.fetchall():
logging.status("FAIL", "username is already in use").status_update(self)
def password_verify(self):
# This will be configurable
min_len = 4
max_len = 100
if self.password == None:
logging.status("FAIL", "password cannot be null").status_update(self)
elif len(self.password) < min_len or len(self.password) > max_len:
logging.status("FAIL", f"password cant be shorter than {min_len} characters or longer than {max_len}").status_update(self)
elif self.password != self.repassword:
logging.status("FAIL", f"passwords do not match").status_update(self)
def key_verify(self):
if self.key == config_read('authorisation', 'RegistrationKey'):
return True
else:
return False
logging.status("FAIL", "registration code is incorrect").status_update(self)
class reg_admin(reg_cred):
def __init__(self, cred):
super().__init__(cred)
self.level = "admin"
logging.status("INFO", "admin registration initialised").status_update(self)
def key_verify(self):
if self.key == config_read('authorisation', 'AdminKey'):
return True
else:
return False
def first_time(self):
self.cur.execute("SELECT user_id FROM auth_credentials WHERE level = ?", (self.level,))
value = self.cur.fetchone()
if value:
return False
else:
return True
class login_cred():
def __init__(self, sio, sid, cred):
self.username = cred['username']
self.password = cred['password']
self.sio = sio
self.sid = sid
self.db = db_connect()
self.db.create(self)
logging.status("INFO", "credential login initialised").status_update(self)
def exec(self):
self.process_password()
self.cur.execute("SELECT user_id FROM auth_credentials WHERE username = ? AND password = ?", (self.username, self.password_hash))
self.id = self.cur.fetchone()
if self.id:
self.id = self.id[0]
logging.status("INFO", "valid login credentials").status_update(self)
login_token.create_token(self)
login_token.send_token(self)
logging.status("INFO", "login successful").status_update(self)
else:
logging.status("FAIL", "invalid login credentials").status_update(self)
self.db.close()
def process_password(self):
self.cur.execute("SELECT user_id FROM auth_credentials WHERE username = ?", (self.username,))
user_id = self.cur.fetchone()
if user_id:
self.password_hash = hash_string(self.password + user_id[0])
else:
self.password_hash = None
class login_token():
def __init__(self, cred):
self.token = cred['token']
self.db = db_connect()
self.db.create(self)
logging.status("INFO", "token login initialised").status_update(self)
def exec(self):
self.token_hash = hash_string(self.token)
self.cur.execute("SELECT user_id, token_expire FROM auth_tokens WHERE token = ?", (self.token_hash,))
fetch_data = self.cur.fetchall()
self.id = None
if fetch_data:
self.id, self.token_expire = fetch_data[0][0], fetch_data[0][1]
if self.token_expire > timestamp().now:
logging.status("INFO", "valid token").status_update(self)
else:
logging.status("FAIL", "invalid token").status_update(self)
else:
logging.status("FAIL", "invalid token").status_update(self)
self.db.close()
@staticmethod
def create_token(self):
expire_time = float(config_read("authorisation", "tokenexpirytime"))
self.token = uuid_generate()
self.token_hash = hash_string(self.token)
self.token_expire = timestamp().now + expire_time
### ALL NEEDS CHANGING
self.cur.execute("INSERT INTO auth_tokens (user_id, token, token_expire) VALUES (?, ?, ?)", (self.id, self.token_hash, self.token_expire))
self.db.commit()
logging.status("INFO", "authentication token created").status_update(self)
@staticmethod
def send_token(self):
self.sio.emit('recv_token', {'token':self.token, 'expire': self.token_expire}, room=self.sid)
logging.status("INFO", "token sent").status_update(self)
class error_process():
def __init__(self):
logging.status("WARNING", "something went wrong").status_update(self)
self.id = None
def login(sio, sid, cred):
if "token" in cred:
try:
client = login_token(cred)
client.exec()
except:
logging.status("FAIL", "token not authorised").status_update(client)
elif all(param in cred for param in ['username', 'password']):
try:
client = login_cred(sio, sid, cred)
client.exec()
except:
logging.status("FAIL", "login failed").status_update(client)
else:
client = error_process()
logging.status("FAIL", "no credentials provided").status_update(client)
client.level = user_info.level(user_id=client.id).get()
if client.level:
client.level = client.level['level']
return client.status, client.id, client.level
def register(cred):
if all(param in cred for param in ['username', 'password', 'repassword', 'key']):
try:
client = reg_cred(cred)
client.exec()
except:
logging.status("FAIL", "registration failed").status_update(client)
else:
client = generic_process()
logging.status("FAIL", "no credentials provided").status_update(client)
return client.status
def admin_register(cred):
if all(param in cred for param in ['username', 'password', 'repassword', 'key']):
try:
client = reg_admin(cred)
if client.key_verify() == True and client.first_time() == True:
client.exec()
else:
logging.status("FAIL", "admin key does not match/admin already exists").status_update(client)
except:
logging.status("FAIL", "registration failed").status_update(client)
else:
client = error_process()
logging.status("FAIL", "no credentials provided").status_update(client)
return client.status
def authorised(sio, sid, min_level='admin'):
level_list = ['member', 'management', 'admin']
allow_levels = level_list[level_list.index(min_level):]
level = sio.get_session(sid)['level']
if level in allow_levels:
user_authorised = True
else:
user_authorised = False
return user_authorised
def main():
error = error_process()
if __name__ == "__main__":
main()