import sqlite3
from typing import List, Optional, Tuple
from CryptoPrice.storage.DataBase import DataBase, SQLConditionEnum
from CryptoPrice.common.prices import Kline
from CryptoPrice.storage.tables import KlineTable, KlineCacheTable
from CryptoPrice.utils.time import TIMEFRAME
[docs]class KlineDataBase(DataBase):
[docs] def __init__(self, name: str):
"""
Instantiate a kline database object, the name will be used for the saving file
:param name: name of the database
:type name: str
"""
super().__init__(name)
[docs] def add_klines(self, klines: List[Kline], ignore_if_exists: bool = False):
"""
add several klines to the database
:param klines: list of klines to add to the database
:type klines: List[Kline]
:param ignore_if_exists: if integrity errors should be ignored, default False
:type ignore_if_exists: bool
:return: None
:rtype: None
"""
for kline in klines:
table = KlineTable(klines[0].asset, kline.ref_asset, kline.timeframe)
row = (kline.open_timestamp, kline.open, kline.high, kline.low, kline.close)
try:
self.add_row(table, row, auto_commit=False)
except sqlite3.IntegrityError as err:
if not ignore_if_exists:
raise err
self.commit()
[docs] def get_klines(self, asset: str, ref_asset: str, timeframe: TIMEFRAME, start_time: Optional[int] = None,
end_time: Optional[int] = None) -> List[Kline]:
"""
return the klines corresponding to a trading pair and a timeframe
a time window can also be provided.
:param asset: asset of the trading pair
:type asset: str
:param ref_asset: reference asset of the trading pair
:type ref_asset: str
:param timeframe: timeframe for the kline
:type timeframe: TIMEFRAME
:param start_time: fetch only klines with an open time greater or equal than start_time
:type start_time: Optional[int]
:param end_time: fetch only klines with an open time lower than end_time
:type end_time: Optional[int]
:return: list of klines
:rtype: List[Klines]
"""
table = KlineTable(asset, ref_asset, timeframe)
conditions_list = []
if start_time is not None:
conditions_list.append((table.open_timestamp,
SQLConditionEnum.greater_equal,
start_time))
if end_time is not None:
conditions_list.append((table.open_timestamp,
SQLConditionEnum.lower,
end_time))
rows = self.get_conditions_rows(table, conditions_list=conditions_list)
return [self.row_to_kline(asset, ref_asset, timeframe, r) for r in rows]
[docs] def get_closest_kline(self, asset: str, ref_asset: str, timeframe: TIMEFRAME, timestamp: int,
window: int = 120) -> Optional[Kline]:
"""
Return the closest Kline in a time window for a trading pair and a timeframe.
If there is no Kline, None is returned
:param asset: asset of the trading pair
:type asset: str
:param ref_asset: reference asset of the trading pair
:type ref_asset: str
:param timeframe: timeframe for the kline
:type timeframe: TIMEFRAME
:param timestamp: time of interest in seconds
:type timestamp: int
:param window: time window in seconds for the kline to look
:type window: int
:return: the Kline with an open time the closest to the provided timestamp
:rtype: Optional[Kline]
"""
table = KlineTable(asset, ref_asset, timeframe)
conditions_list = [
(table.open_timestamp,
SQLConditionEnum.greater_equal,
timestamp - window),
(table.open_timestamp,
SQLConditionEnum.lower,
timestamp + window)
]
order_list = [f'ABS({table.open_timestamp} - {timestamp})']
klines = self.get_conditions_rows(table, conditions_list=conditions_list, order_list=order_list)
if len(klines):
return self.row_to_kline(asset, ref_asset, timeframe, klines[0])
[docs] def drop_pair_table(self, asset: str, ref_asset: str, timeframe: TIMEFRAME):
"""
drop the table associated with a trading pair and a time frame
:param asset: asset of the trading pair
:type asset: str
:param ref_asset: reference asset of the trading pair
:type ref_asset: str
:param timeframe: timeframe for the kline
:type timeframe: TIMEFRAME
:return: None
:rtype: None
"""
table = KlineTable(asset, ref_asset, timeframe)
super().drop_table(table)
[docs] def row_to_kline(self, asset: str, ref_asset: str, timeframe: TIMEFRAME, row: Tuple):
"""
Take a row from the KlineDatabase and transform it into a Kline object
:param asset: asset of the trading pair
:type asset: str
:param ref_asset: reference asset of the trading pair
:type ref_asset: str
:param timeframe: timeframe for the kline
:type timeframe: TIMEFRAME
:param row: raw data from the database
:type row: Tuple
:return: the Kline corresponding to the args
:rtype: Kline
"""
return Kline(*row, asset=asset, ref_asset=ref_asset, timeframe=timeframe, source=self.name)
[docs] def get_cache_closest(self, asset: str, ref_asset: str, timeframe: TIMEFRAME,
timestamp: int) -> Tuple[Optional[int], int]:
"""
Look if a request for the timestamp has been saved in the cache
Return the cached timestamp of the previously selected kline along with the window used at that time
a timestamp of -1 means that no result were found
:param asset: asset of the trading pair
:type asset: str
:param ref_asset: reference asset of the trading pair
:type ref_asset: str
:param timeframe: timeframe for the kline
:type timeframe: TIMEFRAME
:param timestamp: request timestamp
:type timestamp: int
:return: cached_timestamp, window
:rtype: Optional[int], int
"""
table = KlineCacheTable(asset, ref_asset, timeframe)
row = self.get_row_by_key(table, timestamp)
if row is not None:
return row[1:]
else:
return None, -1
[docs] def add_cache_closest(self, asset: str, ref_asset: str, timeframe: TIMEFRAME, timestamp: int,
closest_timestamp: int, window: int):
"""
Save the result of a previous closest price request
:param asset: asset of the trading pair
:type asset: str
:param ref_asset: reference asset of the trading pair
:type ref_asset: str
:param timeframe: timeframe for the kline
:type timeframe: TIMEFRAME
:param timestamp: request timestamp
:type timestamp: int
:param closest_timestamp: the timestamp that got selected as the closest
:type closest_timestamp: int
:param window: time window in seconds that got used to fetch the klines
:type window: int
:return: None
:rtype: None
"""
table = KlineCacheTable(asset, ref_asset, timeframe)
row = (timestamp, closest_timestamp, window)
self.add_row(table, row, update_if_exists=True)
[docs] def drop_cache_tables(self):
"""
Delete all the cache tables stored in the database
:return: None
:rtype: None
"""
tables = [t[1] for t in self.get_tables_descriptions()]
tables = [table for table in tables if table.endswith('_cache')]
self.drop_tables(tables)