Source code for CryptoPrice.retrievers.MetaRetriever
from queue import Queue
from typing import List, Optional, Dict, Tuple, Iterator
from CryptoPrice.common.prices import Price, MetaPrice
from CryptoPrice.common.trade import TradingPair
from CryptoPrice.retrievers.AbstractRetriever import AbstractRetriever
[docs]class MetaRetriever(AbstractRetriever):
[docs] def __init__(self, retrievers: List[AbstractRetriever]):
self.retrievers = {r.name: r for r in retrievers}
super(MetaRetriever, self).__init__("meta_retriever")
[docs] def get_supported_pairs(self) -> List[TradingPair]:
"""
Return the list of trading pair supported by this retriever
:return: list of trading pairs
:rtype: List[TradingPair]
"""
supported_pairs = []
for _, retriever in self.retrievers.items():
supported_pairs.extend(retriever.supported_pairs)
return supported_pairs
def _get_closest_price(self, asset: str, ref_asset: str, timestamp: int) -> Optional[Price]:
"""
Will get the closest price possible in time for a trading pair asset/ref asset. If no price is found, return
None
:param asset: name of the asset in the trading pair (ex 'BTC' in 'BTCUSDT')
:type asset: str
:param ref_asset: name of the reference asset in the trading pair (ex 'USDT' in 'BTCUSDT')
:type ref_asset: str
:param timestamp: time to fetch the price needed (in seconds)
:type timestamp: int
:return: the price closest in time found or None if no price found
:rtype: Optional[Price]
"""
for _, retriever in self.retrievers.items():
price = retriever.get_closest_price(asset, ref_asset, timestamp)
if price is not None:
return price
[docs] def get_mean_price(self, asset: str, ref_asset: str, timestamp: int, preferred_assets: Optional[List[str]] = None,
max_depth: int = 3, max_depth_range: int = 0) -> Optional[MetaPrice]:
"""
Will use the method get_path_prices and return the mean price of an asset compared to a reference asset
on a given timestamp.
:param asset: name of the asset to get the price of
:type asset: str
:param ref_asset: name of the reference asset
:type ref_asset: str
:param timestamp: time to fetch the price needed (in seconds)
:type timestamp: int
:param preferred_assets: list of assets to construct the price path from. If None, default value is
['BTC', 'ETH']
:type preferred_assets: Optional[List[str]]
:param max_depth: maximum number of trading pair to use, default 3
:type max_depth: int
:param max_depth_range: maximum length difference between different trading path. If the first trading path has
a length of 1 and this parameter is equal to 2, trading_path with a length superior to 3 will be ignored
:type max_depth_range: int
:return: Metaprice reflecting the value calculated with a mean of trading path
:rtype: Optional[MetaPrice]
"""
meta_prices = []
min_depth = max_depth + 1
for meta_price in self.get_path_prices(asset, ref_asset, timestamp, preferred_assets,
max_depth, -1):
if meta_price is not None:
if min_depth > max_depth:
min_depth = len(meta_price.prices)
if len(meta_price.prices) - min_depth > max_depth_range:
break
else:
meta_prices.append(meta_price)
if len(meta_prices):
return MetaPrice.mean_from_meta_price(meta_prices)
[docs] def get_path_prices(self, asset: str, ref_asset: str, timestamp: int,
preferred_assets: Optional[List[str]] = None, max_depth: int = 2,
max_depth_range: int = -1) -> Iterator[MetaPrice]:
"""
Iterator that return MetaPrices that estimates the price of an asset compared to a reference asset.
It will use the trading pair at its disposal to create trading path from the asset to the ref asset.
It use a BFS algorithm, so the shortest path will be returned first.
If no price is found, return None
:param max_depth_range:
:type max_depth_range:
:param asset: name of the asset to get the price of
:type asset: str
:param ref_asset: name of the reference asset
:type ref_asset: str
:param timestamp: time to fetch the price needed (in seconds)
:type timestamp: int
:param preferred_assets: list of assets to construct the price path from. If None, default value is
['BTC', 'ETH']
:type preferred_assets: Optional[List[str]]
:param max_depth: maximum number of trading pair to use, default 2
:type max_depth: int
:param max_depth_range: maximum length difference between different trading path. If the first trading path has
a length of 1 and this parameter is equal to 2, trading_path with a length superior to 3 will be ignored.
Default -1 means that this parameter is ignored.
:type max_depth_range: int
:return: Metaprice reflecting the value calculated through a trading path
:rtype: Optional[MetaPrice]
"""
if asset == ref_asset:
yield MetaPrice(1, asset, ref_asset, [], source=set('',))
return
if preferred_assets is None:
preferred_assets = ['BTC', 'ETH']
assets_neighbours = self.construct_assets_neighbours(preferred_assets + [asset, ref_asset])
if asset not in assets_neighbours or ref_asset not in assets_neighbours:
yield None
return
seen_assets = [asset]
current_path = []
for assets_p, trade_p in self._explore_assets_path(ref_asset, assets_neighbours, seen_assets,
current_path, max_depth=max_depth,
max_depth_range=max_depth_range):
price_path = []
for pair in trade_p:
price = self.retrievers[pair.source].get_closest_price(pair.asset, pair.ref_asset, timestamp)
if price is not None:
price_path.append(price)
else:
break
if len(price_path) + 1 == len(assets_p): # all prices have been found
yield MetaPrice.from_price_path(assets_p, price_path)
[docs] def construct_assets_neighbours(self, asset_subsets: List[str]) -> Dict:
"""
Construct a dictionary of neighbours assets, with the trading pairs needed to get from one asset to another
:param asset_subsets: list of assets to use among supported assets
:type asset_subsets: List[str]
:return: assets_neighbours
:rtype: Dict
"""
assets_neighbours = {}
for pair in self.supported_pairs:
if pair.asset in asset_subsets and pair.ref_asset in asset_subsets:
try:
assets_neighbours[pair.asset][pair.ref_asset].append(pair)
except KeyError:
try:
assets_neighbours[pair.asset][pair.ref_asset] = [pair]
except KeyError:
assets_neighbours[pair.asset] = {pair.ref_asset: [pair]}
try:
assets_neighbours[pair.ref_asset][pair.asset].append(pair)
except KeyError:
try:
assets_neighbours[pair.ref_asset][pair.asset] = [pair]
except KeyError:
assets_neighbours[pair.ref_asset] = {pair.asset: [pair]}
return assets_neighbours
@staticmethod
def _explore_assets_path(target_asset: str, assets_neighbours: Dict,
seen_assets: List[str], current_path: List[TradingPair], max_depth: int = 3,
max_depth_range: int = -1) -> Iterator[Tuple[List[str], List[TradingPair]]]:
"""
Iterator that will explore the trading possibilities to link one asset to another one through a trading path
:param target_asset: asset to look for
:type target_asset: str
:param assets_neighbours: dictionary of neighbours for assets
:type assets_neighbours: Dict
:param seen_assets: assets already seen on the path
:type seen_assets: List[str]
:param current_path: trading pair to use (in order) to follow the path to the target asset
:type current_path: List[TradingPair]
:param max_depth: maximum number of trading pair to use, default 3
:type max_depth: int
:param max_depth_range: maximum length difference between different trading path. If the first trading path has
a length of 1 and this parameter is equal to 2, trading_path with a length superior to 3 will be ignored.
Default -1 means that this parameter is ignored.
:type max_depth_range: int
:return: list of assets to explore and the trading path to take
:rtype: Tuple[List[str], List[TradingPair]]
"""
to_explore = Queue()
to_explore.put((seen_assets, current_path))
min_depth = None
while to_explore.qsize():
seen_assets, current_path = to_explore.get()
current_asset = seen_assets[-1]
if min_depth is not None and len(current_path) + 1 - min_depth > max_depth_range:
break
for next_asset in assets_neighbours[current_asset]:
if next_asset not in seen_assets:
for pair in assets_neighbours[current_asset][next_asset]:
if next_asset == target_asset:
if min_depth is None and max_depth_range >= 0:
min_depth = len(current_path) + 1
yield seen_assets + [next_asset], current_path + [pair]
elif len(current_path) + 1 < max_depth:
to_explore.put((seen_assets + [next_asset], current_path + [pair]))