from enum import Enum
from typing import List, Tuple, Optional, Any, Union
import sqlite3
from CryptoPrice.storage.tables import Table
from CryptoPrice.utils.LoggerGenerator import LoggerGenerator
from CryptoPrice.utils.paths import get_data_path
[docs]class SQLConditionEnum(Enum):
"""
https://www.techonthenet.com/sqlite/comparison_operators.php
"""
equal = '='
greater_equal = '>='
greater = '>'
lower = '<'
lower_equal = '<='
diff = '!='
[docs]class DataBase:
"""
This class will be used to interact with sqlite3 databases without having to generates sqlite commands
"""
[docs] def __init__(self, name: str):
"""
Instantiate a database object, the name will be used for the saving file
:param name: name of the database
:type name: str
"""
self.name = name
self.logger = LoggerGenerator.get_logger(self.name)
self.save_path = get_data_path() / f"{name}.db"
self.db_conn = sqlite3.connect(self.save_path)
self.db_cursor = self.db_conn.cursor()
def _fetch_rows(self, execution_cmd: str):
"""
execute a command to fetch some rows and return them
:param execution_cmd: the command to execute
:return:
"""
rows = []
try:
self.db_cursor.execute(execution_cmd)
except sqlite3.OperationalError:
return rows
while True:
row = self.db_cursor.fetchone()
if row is None:
break
rows.append(row)
return rows
[docs] def get_row_by_key(self, table: Table, key_value) -> Optional[Tuple]:
"""
get the row identified by a primary key value from a table
:param table: table to fetch the row from
:param key_value: key value of the row
:return: None or the row of value
"""
if table.primary_key is None:
raise ValueError(f"table {table.name} has no explicit primary key")
conditions_list = [(table.primary_key, SQLConditionEnum.equal, key_value)]
rows = self.get_conditions_rows(table, conditions_list=conditions_list)
if len(rows):
return rows[0]
[docs] def get_conditions_rows(self, table: Table,
selection: Optional[Union[str, List[str]]] = None,
conditions_list: Optional[List[Tuple[str, SQLConditionEnum, Any]]] = None,
order_list: Optional[List[str]] = None) -> List:
if selection is None:
selection = '*'
elif isinstance(selection, List):
selection = ','.join(selection)
if conditions_list is None:
conditions_list = []
if order_list is None:
order_list = []
execution_cmd = f"SELECT {selection} from {table.name}"
execution_cmd = self._add_conditions(execution_cmd, conditions_list=conditions_list)
execution_cmd = self._add_order(execution_cmd, order_list=order_list)
return self._fetch_rows(execution_cmd)
[docs] def get_all_rows(self, table: Table) -> List:
return self.get_conditions_rows(table)
[docs] def add_row(self, table: Table, row: Tuple, auto_commit: bool = True, update_if_exists: bool = False):
row_s = ", ".join(f"'{v}'" for v in row)
row_s = f'({row_s})'
execution_order = f"INSERT INTO {table.name} VALUES {row_s}"
try:
self.db_cursor.execute(execution_order)
if auto_commit:
self.commit()
except sqlite3.OperationalError:
self.create_table(table)
self.db_cursor.execute(execution_order)
if auto_commit:
self.commit()
except sqlite3.IntegrityError as err:
if update_if_exists:
self.update_row(table, row, auto_commit)
else:
raise err
[docs] def add_rows(self, table: Table, rows: List[Tuple], auto_commit: bool = True, update_if_exists: bool = False):
for row in rows:
self.add_row(table, row, auto_commit=False, update_if_exists=update_if_exists)
if auto_commit:
self.commit()
[docs] def update_row(self, table: Table, row: Tuple, auto_commit=True):
row_s = ", ".join(f"{n} = {v}" for n, v in zip(table.columns_names, row))
execution_order = f"UPDATE {table.name} SET {row_s} WHERE {table.primary_key} = {row[0]}"
self.db_cursor.execute(execution_order)
if auto_commit:
self.commit()
[docs] def create_table(self, table: Table):
"""
create a table in the database
:param table: Table instance with the config of the table to create
:return:
"""
create_cmd = self.get_create_cmd(table)
self.db_cursor.execute(create_cmd)
self.db_conn.commit()
[docs] def drop_table(self, table: Union[Table, str]):
"""
delete a table from the database
:param table: table or table name to drop
:type table: str or Table instance
:return: None
:rtype: None
"""
self.drop_tables([table])
[docs] def drop_tables(self, tables: List[Union[Table, str]]):
"""
Delete a list of tables if they exist
:param tables: list of tables to delete
:type tables: List[Union[Table, str]]
:return: None
:rtype: None
"""
for table in tables:
if isinstance(table, Table):
table = table.name
execution_order = f"DROP TABLE IF EXISTS {table}"
self.db_cursor.execute(execution_order)
self.commit()
[docs] def drop_all_tables(self):
"""
drop all the tables existing in the database
:return: None
:rtype: None
"""
tables = [t[1] for t in self.get_tables_descriptions()]
self.drop_tables(tables)
[docs] def get_tables_descriptions(self) -> List[Tuple]:
"""
return the descriptions of all the tables existing in the database
:return: tables descriptions
:rtype: List[Tuple]
"""
cmd = "SELECT * FROM sqlite_master WHERE type='table';"
return self._fetch_rows(cmd)
[docs] def commit(self):
"""
submit and save the database state
:return:
"""
self.db_conn.commit()
@staticmethod
def _add_conditions(execution_cmd: str, conditions_list: List[Tuple[str, SQLConditionEnum, Any]]):
"""
add a list of condition to an SQL command
:param execution_cmd: SQL command without 'WHERE' statement
:type execution_cmd: str
:param conditions_list:
:return:
"""
if len(conditions_list):
add_cmd = ' WHERE'
for column_name, condition, value in conditions_list:
add_cmd = add_cmd + f" {column_name} {condition.value} '{value}' AND"
return execution_cmd + add_cmd[:-4]
else:
return execution_cmd
@staticmethod
def _add_order(execution_cmd: str, order_list: List[str]):
"""
add an order specification to an SQL command
:param execution_cmd: SQL command without 'ORDER BY' statement
:type execution_cmd: str
:param order_list:
:type order_list:
:return:
:rtype:
"""
if len(order_list):
add_cmd = ' ORDER BY'
for column_name in order_list:
add_cmd = add_cmd + f" {column_name},"
return execution_cmd + add_cmd[:-1] + ' ASC'
else:
return execution_cmd
[docs] @staticmethod
def get_create_cmd(table: Table):
"""
return the command in string format to create a table in the database
:param table: Table instance with the config if the table to create
:return: execution command for the table creation
"""
cmd = ""
if table.primary_key is not None:
cmd = f"[{table.primary_key}] {table.primary_key_sql_type} PRIMARY KEY, "
for arg_name, arg_type in zip(table.columns_names, table.columns_sql_types):
cmd = cmd + f"[{arg_name}] {arg_type}, "
return f"CREATE TABLE {table.name}\n({cmd[:-2]})"