Source code for CryptoPrice.retrievers.AbstractRetriever

from abc import ABC, abstractmethod
from typing import Optional, List

from CryptoPrice.common.prices import Price
from CryptoPrice.common.trade import TradingPair
from CryptoPrice.utils.LoggerGenerator import LoggerGenerator


[docs]class AbstractRetriever(ABC): MAX_API_RETRY = 3
[docs] def __init__(self, name: str): self.name = name self.logger = LoggerGenerator.get_logger(self.name) self.supported_pairs = self.get_supported_pairs()
[docs] @abstractmethod def get_supported_pairs(self) -> List[TradingPair]: """ Return the list of trading pair supported by this retriever :return: list of trading pairs :rtype: List[TradingPair] """ raise NotImplementedError
[docs] def get_closest_price(self, asset: str, ref_asset: str, timestamp: int) -> Optional[Price]: """ Will get the closest price possible in time for a trading pair asset/ref asset. If no price is found, return None :param asset: name of the asset in the trading pair (ex 'BTC' in 'BTCUSDT') :type asset: str :param ref_asset: name of the reference asset in the trading pair (ex 'USDT' in 'BTCUSDT') :type ref_asset: str :param timestamp: time to fetch the price needed (in seconds) :type timestamp: int :return: the price closest in time found or None if no price found :rtype: Optional[Price] """ if asset == ref_asset: return Price(1, asset, ref_asset, timestamp, source='') trading_pair = TradingPair('', asset, ref_asset, '') if trading_pair not in self.supported_pairs: return return self._get_closest_price(asset, ref_asset, timestamp)
@abstractmethod def _get_closest_price(self, asset: str, ref_asset: str, timestamp: int) -> Optional[Price]: """ Will get the closest price possible in time for a trading pair asset/ref asset. If no price is found, return None :param asset: name of the asset in the trading pair (ex 'BTC' in 'BTCUSDT') :type asset: str :param ref_asset: name of the reference asset in the trading pair (ex 'USDT' in 'BTCUSDT') :type ref_asset: str :param timestamp: time to fetch the price needed (in seconds) :type timestamp: int :return: the price closest in time found or None if no price found :rtype: Optional[Price] """ raise NotImplementedError