694 lines
25 KiB
Python
694 lines
25 KiB
Python
import sqlite3
|
|
import os
|
|
import ctypes
|
|
import pathlib
|
|
|
|
import base64
|
|
from cryptography.fernet import Fernet
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
|
|
|
# db encrypt
|
|
from cryptography.fernet import Fernet
|
|
#from pysqlcipher3 import dbapi2 as sqlite3
|
|
# db encrypt
|
|
|
|
from modules.track.logging import log
|
|
from modules.data.config import read as config_read
|
|
from modules.algorithms.uuid import generate as uuid_generate
|
|
|
|
class connect():
|
|
def __init__(self):
|
|
self.path = config_read("database", "Path")
|
|
|
|
def create(self, obj):
|
|
self.con = sqlite3.connect(self.path)
|
|
self.cur = self.con.cursor()
|
|
|
|
if obj != None:
|
|
obj.con = self.con
|
|
obj.cur = self.cur
|
|
|
|
def commit(self):
|
|
self.con.commit()
|
|
|
|
def close(self):
|
|
self.con.commit()
|
|
self.con.close()
|
|
|
|
def execute(self, command, values=None):
|
|
cur = self.con.cursor()
|
|
cur.execute(command, values)
|
|
self.close()
|
|
|
|
# Table creation
|
|
class create():
|
|
def __init__(self):
|
|
self.path = config_read("database", "Path")
|
|
self.en_path = config_read("database", "EncryptedPath")
|
|
|
|
def tables(self):
|
|
decrypted_database = os.path.exists(self.path)
|
|
encrypted_database = os.path.exists(self.en_path)
|
|
if decrypted_database or encrypted_database:
|
|
return
|
|
|
|
con = sqlite3.connect(self.path)
|
|
self.cur = con.cursor()
|
|
|
|
tables = [self.auth_credentials, self.auth_tokens, self.profile , self.friends, self.occupations, self.occupation_requests, self.teams, self.team_leaders, self.posts, self.comments, self.post_impressions, self.comment_impressions, self.time_slots, self.notifications, self.notifications_sent]
|
|
for table in tables:
|
|
table()
|
|
|
|
def auth_credentials(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS auth_credentials (
|
|
user_id TEXT NOT NULL PRIMARY KEY,
|
|
username TEXT NOT NULL,
|
|
password TEXT NOT NULL,
|
|
level TEXT NOT NULL,
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES profile (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
def auth_tokens(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS auth_tokens(
|
|
user_id TEXT NOT NULL,
|
|
token TEXT NOT NULL PRIMARY KEY,
|
|
token_expire REAL NOT NULL,
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES auth_credentials (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
def profile(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS profile (
|
|
user_id TEXT NOT NULL PRIMARY KEY,
|
|
occupation_id TEXT,
|
|
name TEXT,
|
|
picture TEXT,
|
|
biography TEXT,
|
|
role TEXT,
|
|
num_friends INTEGER DEFAULT 0,
|
|
FOREIGN KEY (occupation_id)
|
|
REFERENCES occupations (occupation_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE SET NULL
|
|
)
|
|
""")
|
|
|
|
def friends(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS friends (
|
|
user_id TEXT NOT NULL,
|
|
friend_id TEXT NOT NULL,
|
|
approved BOOLEAN,
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES profile (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
FOREIGN KEY (friend_id)
|
|
REFERENCES profile (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
PRIMARY KEY (user_id, friend_id)
|
|
)
|
|
""")
|
|
|
|
def occupations(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS occupations (
|
|
occupation_id TEXT NOT NULL PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
description TEXT
|
|
)
|
|
""")
|
|
|
|
def occupation_requests(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS occupation_requests (
|
|
user_id TEXT NOT NULL PRIMARY KEY,
|
|
occupation_id TEXT NOT NULL,
|
|
approved BOOLEAN DEFAULT False NOT NULL,
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES profile (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
FOREIGN KEY (occupation_id)
|
|
REFERENCES occupations (occupation_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
def teams(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS teams (
|
|
team_id TEXT NOT NULL PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
occupation_id TEXT,
|
|
user_id TEXT,
|
|
FOREIGN KEY (occupation_id)
|
|
REFERENCES occupations (occupation_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES profile (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
def team_leaders(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS team_leaders (
|
|
user_id TEXT NOT NULL,
|
|
team_id TEXT NOT NULL,
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES profile (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
FOREIGN KEY (team_id)
|
|
REFERENCES teams (team_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
PRIMARY KEY (user_id, team_id)
|
|
)
|
|
""")
|
|
|
|
def posts(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS posts (
|
|
post_id TEXT NOT NULL PRIMARY KEY,
|
|
user_id TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
caption TEXT,
|
|
date TEXT NOT NULL,
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES profile (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
def comments(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS comments (
|
|
comment_id TEXT NOT NULL PRIMARY KEY,
|
|
post_id TEXT NOT NULL,
|
|
user_id TEXT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
FOREIGN KEY (post_id)
|
|
REFERENCES posts (post_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES profile (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
def post_impressions(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS post_impressions (
|
|
impression_id TEXT NOT NULL PRIMARY KEY,
|
|
post_id NOT NULL,
|
|
user_id NOT NULL,
|
|
type NOT NULL,
|
|
FOREIGN KEY (post_id)
|
|
REFERENCES posts (post_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES profile (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
def comment_impressions(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS comment_impressions (
|
|
impression_id TEXT NOT NULL PRIMARY KEY,
|
|
comment_id NOT NULL,
|
|
user_id NOT NULL,
|
|
type NOT NULL,
|
|
FOREIGN KEY (comment_id)
|
|
REFERENCES comments (comment_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES profile (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
def time_slots(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS time_slots (
|
|
date TEXT NOT NULL PRIMARY KEY,
|
|
start FLOAT NOT NULL,
|
|
end FLOAT NOT NULL
|
|
)
|
|
""")
|
|
|
|
def notifications(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS notifications (
|
|
notification_id TEXT NOT NULL PRIMARY KEY,
|
|
target_id TEXT NOT NULL,
|
|
title TEXT NOT NULL,
|
|
content TEXT,
|
|
time_created FLOAT NOT NULL,
|
|
expire_after FLOAT NOT NULL
|
|
)
|
|
""")
|
|
|
|
def notifications_sent(self):
|
|
self.cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS notifications_sent (
|
|
notification_id TEXT NOT NULL,
|
|
user_id TEXT NOT NULL,
|
|
time_sent FLOAT,
|
|
sent BOOLEAN DEFAULT False NOT NULL,
|
|
PRIMARY KEY (notification_id, user_id)
|
|
FOREIGN KEY (notification_id)
|
|
REFERENCES notifications (notification_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
FOREIGN KEY (user_id)
|
|
REFERENCES profile (user_id)
|
|
ON UPDATE CASCADE
|
|
ON DELETE CASCADE
|
|
)
|
|
""")
|
|
|
|
class encryption():
|
|
def __init__(self, session):
|
|
self.key = key()
|
|
# needs to pass num_shares and min_shares
|
|
self.session = session
|
|
|
|
self.sss_enabled = config_read("database", "ShamirSecretSharing")
|
|
self.en_config_path = config_read("database", "EncryptionConfigPath")
|
|
self.db_path = config_read("database", "Path")
|
|
self.en_db_path = config_read("database", "EncryptedPath")
|
|
|
|
def mode(self):
|
|
# uses a large amount of logic statements to figure out what mode the server should enter on launch
|
|
# additionally what flags it should launch with
|
|
encryption_enabled = config_read("database", "Encrypt")
|
|
db_encrypted = self.key.is_db_encrypted()
|
|
|
|
mode = None
|
|
flags = []
|
|
if encryption_enabled:
|
|
if db_encrypted:
|
|
mode = "decrypt"
|
|
else:
|
|
success = self.encrypt()
|
|
if success:
|
|
mode = "decrypt"
|
|
else:
|
|
exit()
|
|
else:
|
|
if db_encrypted:
|
|
mode = "decrypt"
|
|
flags = ["forever"]
|
|
else:
|
|
mode = "normal"
|
|
self.session.db_encrypted = False
|
|
|
|
self.session.mode = mode
|
|
self.session.flags = flags
|
|
|
|
def encrypt(self, flags=[]):
|
|
if self.session.password:
|
|
password = self.session.password
|
|
else:
|
|
password = self._generate()
|
|
|
|
if not password:
|
|
log("FAIL", "Could not encrypt database, something went wrong, see logs for details")
|
|
return False
|
|
|
|
scheme = self.key.read_db_scheme(password)
|
|
with open(self.db_path, "rb") as db:
|
|
db_data = db.read()
|
|
|
|
# create new encrypted database
|
|
log("INFO", "Encrypting database")
|
|
en_db_data = scheme.encrypt(db_data)
|
|
with open(self.en_db_path, "wb") as en_db:
|
|
en_db.write(en_db_data)
|
|
|
|
# delete unecrypted database
|
|
os.remove("data/database.db")
|
|
log("INFO", "Deleted unencrypted database")
|
|
return True
|
|
|
|
def decrypt(self, data, flags=[]):
|
|
min_shares = config_read('database', 'MinimumShares')
|
|
if "sss" in flags:
|
|
password = int(shares(min_shares).get_key(data['shares']))
|
|
else:
|
|
password = int(data['password'])
|
|
|
|
scheme = self.key.read_db_scheme(password)
|
|
if not scheme:
|
|
return False
|
|
|
|
# decrypting the databsae raw bytes
|
|
with open(self.en_db_path, "rb") as en_db:
|
|
en_db_data = en_db.read()
|
|
|
|
db_data = scheme.decrypt(en_db_data)
|
|
with open(self.db_path, "wb") as db:
|
|
db.write(db_data)
|
|
|
|
if not self._database_read():
|
|
log("FAIL", "Decryption of database failed, see logs for details")
|
|
return False
|
|
log("INFO", "Decryption of database successful")
|
|
|
|
self.session.password = password
|
|
for flag in flags:
|
|
if flag == "forever":
|
|
log("WARN", "Permanent decryption of the database")
|
|
self.session.encrypt_on_shutdown = False
|
|
self.key.delete()
|
|
elif flag == "sss":
|
|
with open(self.en_config_path, "w") as en_config:
|
|
en_config.write(str(password))
|
|
log("WARN", f"You decrypted the database using Shamir secret shares, your master password has been reconstructed and can be found on the server at the location: {self.en_config_path}. Please remember to delete this file after reading")
|
|
|
|
self.session.db_encrypted = False
|
|
self.session.mode = "normal"
|
|
return True
|
|
|
|
def _generate(self):
|
|
options = self._read_config()
|
|
if not self._config_check(options):
|
|
log("FAIL", "Could not generate encryption scheme, something wrong in config file or with maseter password")
|
|
return None
|
|
else:
|
|
options['password'] = int(options['password'])
|
|
if self.sss_enabled:
|
|
options['num_shares'] = int(options['num_shares'])
|
|
options['min_shares'] = int(options['min_shares'])
|
|
|
|
self.key.generate_key_file(options['password'])
|
|
|
|
if self.sss_enabled:
|
|
log("INFO", "Shamir Secret Sharing enabled, generating shares")
|
|
sss = shares(options['min_shares'], options['num_shares'])
|
|
sss_success = sss.generate_shares(options['password'])
|
|
if not sss_success:
|
|
log("FAIL", "Something went wrong generating shamir secret shares, see log for details")
|
|
return None
|
|
|
|
log("INFO", "Deleting encryption configuration file containing master password")
|
|
os.remove(self.en_config_path)
|
|
return options['password']
|
|
|
|
def _read_config(self):
|
|
num_shares = config_read("database", "NumberOfShares")
|
|
min_shares = config_read("database", "MinimumShares")
|
|
options = {}
|
|
try:
|
|
with open(self.en_config_path, "r") as config:
|
|
log("INFO","Reading encryption configuration file")
|
|
options['password'] = config.read()
|
|
|
|
if self.sss_enabled:
|
|
options['num_shares'] = num_shares
|
|
options['min_shares'] = min_shares
|
|
except:
|
|
return None
|
|
|
|
return options
|
|
|
|
def _config_check(self, options):
|
|
# checking if the file exists
|
|
try:
|
|
en_config = open(self.en_config_path, "r")
|
|
en_config.close()
|
|
except:
|
|
log("FAIL", f"Encryption config could not be found at {self.en_config_path}")
|
|
return False
|
|
|
|
# check config contents
|
|
try:
|
|
log("INFO", "Testing master password type (must be int)")
|
|
master_pass = int(options['password'])
|
|
if len(options) == 1:
|
|
return True
|
|
elif self.sss_enabled and len(options) == 3:
|
|
log("INFO", "Testing number of shares type (must be integer)")
|
|
num_shares = int(options['num_shares'])
|
|
log("INFO", "Testing minimum shares type (must be integer)")
|
|
min_shares = int(options['min_shares'])
|
|
|
|
if num_shares < 20 and min_shares < 7:
|
|
return True
|
|
else:
|
|
log("WARN", "SSS number of shares is to large or minimum shares is to large")
|
|
return False
|
|
else:
|
|
log("WARN", "Something went wrong reading config file, check the docs for a guide")
|
|
return False
|
|
except:
|
|
log("WARN", "The master password, number of shares and minimum shares all must be integers")
|
|
return False
|
|
|
|
def _database_read(self):
|
|
try:
|
|
db = connect()
|
|
db.create(self)
|
|
db.cur.execute("SELECT * FROM time_slots")
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
class key():
|
|
def __init__(self):
|
|
self.key_path = config_read("database", "KeyPath")
|
|
self.db_path = config_read("database", "Path")
|
|
self.en_db_path = config_read("database", "EncryptedPath")
|
|
self.salt_path = config_read("database", "SaltPath")
|
|
|
|
def _save_salt(self, salt):
|
|
with open(self.salt_path, "wb") as salt_file:
|
|
salt_file.write(salt)
|
|
|
|
def _read_salt(self):
|
|
try:
|
|
with open(self.salt_path, "rb") as salt_file:
|
|
salt = salt_file.read()
|
|
return salt
|
|
except:
|
|
return None
|
|
|
|
def _pass_to_scheme(self, password):
|
|
password = str(password).encode()
|
|
salt = self._read_salt()
|
|
if not salt:
|
|
salt = os.urandom(16)
|
|
self._save_salt(salt)
|
|
|
|
kdf = PBKDF2HMAC(
|
|
algorithm=hashes.SHA256(),
|
|
length=32,
|
|
salt=salt,
|
|
iterations=480000,
|
|
)
|
|
key = base64.urlsafe_b64encode(kdf.derive(password))
|
|
scheme = Fernet(key)
|
|
|
|
return scheme
|
|
|
|
def read_db_scheme(self, password):
|
|
file_scheme = self._pass_to_scheme(password)
|
|
|
|
with open(self.key_path, "r") as key_file:
|
|
en_password = key_file.read()
|
|
|
|
db_scheme = None
|
|
try:
|
|
password = file_scheme.decrypt(en_password)
|
|
db_scheme = self._pass_to_scheme(password)
|
|
except:
|
|
log("WARN", "Provided password is wrong or something is wrong with the database key")
|
|
return db_scheme
|
|
|
|
def generate_key_file(self, password):
|
|
#db_password = bytes(uuid_generate().replace("-", "").encode())
|
|
db_password = uuid_generate().replace("-", "").encode()
|
|
file_scheme = self._pass_to_scheme(password)
|
|
en_db_password = str(file_scheme.encrypt(db_password).decode())
|
|
with open(self.key_path, "w") as key_file:
|
|
key_file.write(en_db_password)
|
|
|
|
def delete(self):
|
|
os.remove(self.salt_path)
|
|
os.remove(self.key_path)
|
|
os.remove(self.en_db_path)
|
|
|
|
def is_db_encrypted(self):
|
|
try:
|
|
db = open(self.en_db_path, "rb")
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
class ShareStruct(ctypes.Structure):
|
|
__fields__ = [("y", ctypes.c_longlong), ("x", ctypes.c_int)]
|
|
|
|
# this class is mainly geared towards acting as an interface for hte c++ code
|
|
class shares():
|
|
def __init__(self, min_shares, num_shares=None):
|
|
if num_shares:
|
|
self.num_shares = int(num_shares)
|
|
self.min_shares = int(min_shares)
|
|
self.shares_path = config_read("database", "SharesPath")
|
|
|
|
def _dict_to_c_array(self, share_list):
|
|
c_share_array = ((ctypes.c_longlong*2)*self.min_shares)
|
|
share_array = []
|
|
|
|
for i in range(len(share_list)):
|
|
c_share = (ctypes.c_longlong*2)(*[share_list[i]['num'], share_list[i]['secret']])
|
|
share_array.append(c_share)
|
|
|
|
c_share_array = ((ctypes.c_longlong*2)*len(share_list))(*share_array)
|
|
return c_share_array
|
|
|
|
def generate_shares(self, password):
|
|
libname = pathlib.Path().absolute() / "modules/data/libcppsss.so"
|
|
c_lib = ctypes.CDLL(libname)
|
|
|
|
c_lib.newSecretInternal.argtypes = [ctypes.c_longlong, ctypes.c_int, ctypes.c_int, ctypes.POINTER(ctypes.c_char)]
|
|
c_lib.newSecretInternal.restypes = None
|
|
|
|
path_ptr = ctypes.c_char_p(self.shares_path.encode('utf-8'))
|
|
c_lib.newSecretInternal(password, self.num_shares, self.min_shares, path_ptr)
|
|
|
|
success = self.verify(password)
|
|
return success
|
|
|
|
def get_key(self, share_list):
|
|
libname = pathlib.Path().absolute() / "modules/data/libcppsss.so"
|
|
c_lib = ctypes.CDLL(libname)
|
|
|
|
c_share_array = ((ctypes.c_longlong*2)*self.min_shares)
|
|
c_share_array_pointer = ctypes.POINTER(c_share_array)
|
|
|
|
c_lib.solveInternal.argtypes = [c_share_array_pointer, ctypes.c_int]
|
|
c_lib.solveInternal.restypes = int
|
|
|
|
new_share_array = ctypes.pointer(self._dict_to_c_array(share_list))
|
|
result = c_lib.solveInternal(new_share_array, self.min_shares)
|
|
return result
|
|
|
|
def verify(self, password):
|
|
# used to verify that the shamir secret shares generated can be used to reconstruct the original key
|
|
log("INFO", "Verifying share integrity")
|
|
# we essentially take a sample of the shares
|
|
# if all these samples work we assume any combination of said samples will
|
|
# this works well since we test the combination of all hte smallest numbers and all teh largest
|
|
# the only reason a set of shares wouldnt work is because they have become to large and c++ starts to lose accuracy
|
|
# if this doesnt happen then its safe to assume all shares work
|
|
shifts = self.num_shares - self.min_shares
|
|
|
|
for i in range(shifts):
|
|
top = i + self.min_shares
|
|
|
|
shares_used = ""
|
|
for num_share in range(i, top):
|
|
shares_used += str(num_share) + ", "
|
|
shares_used = shares_used[:-2]
|
|
|
|
log("INFO", f"Attempting to generate original password with shares: {shares_used}")
|
|
share_list = []
|
|
|
|
for j in range(i, top):
|
|
# reads the shares from their files
|
|
path = self.shares_path + f"share-{j+1}.txt"
|
|
with open(path, "r") as share:
|
|
|
|
try:
|
|
x = int((share.readline().split(": "))[1])
|
|
y = int((share.readline().split(": "))[1])
|
|
share_list.append({'num': x, 'secret': y})
|
|
except:
|
|
log("WARN", "Something went wrong reading one of the shares, have they been altered?")
|
|
break
|
|
|
|
result = self.get_key(share_list)
|
|
|
|
if result != password:
|
|
log("WARN", "A set of shares could not be used to generate the original password, try again or use a diffrent password")
|
|
return False
|
|
else:
|
|
log("INFO", f"{i+1}/{shifts} sets of shares successfully used to generate the original password")
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
db = create()
|
|
db.path = "database.db"
|
|
db.tables()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
class retrieve():
|
|
def __init__(self):
|
|
self.db = db_connect()
|
|
self.db.create(self)
|
|
|
|
def level(self, identifier):
|
|
|
|
self.cur.execute("SELECT level FROM auth_credentials WHERE username = ? OR user_id = ?", (identifier, identifier))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
return rez[0]
|
|
|
|
def user_id(self, username):
|
|
|
|
self.cur.execute("SELECT user_id FROM auth_credentials WHERE username = ?", (username,))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
rez = rez[0]
|
|
|
|
return rez
|
|
|
|
|
|
def username(self, user_id):
|
|
|
|
self.cur.execute("SELECT username FROM auth_credentials WHERE user_id = ?", (user_id,))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
rez = rez[0]
|
|
|
|
return rez
|
|
|
|
def occupation_id(self, user_id):
|
|
|
|
self.cur.execute("SELECT occupation_id FROM profile WHERE user_id = ?", (user_id,))
|
|
rez = self.cur.fetchone()
|
|
if rez:
|
|
rez = rez[0]
|
|
|
|
return rez
|