from bfxapi.rest.bfx_rest import BfxRest from retrying_async import retry from bfxbot.currency import TradingPair, Balance, WalletKind, OrderType, Direction, Currency, BalanceGroup, Symbol from bfxbot.utils import average class BfxWrapper(BfxRest): # default timeframe (in milliseconds) when retrieving old prices DEFAULT_TIME_DELTA = 5 * 60 * 1000 def __init__(self, api_key: str, api_secret: str): super().__init__(API_KEY=api_key, API_SECRET=api_secret) ####################################### # OVERRIDDEN METHODS TO IMPLEMENT RETRY ####################################### @retry() async def get_public_ticker(self, symbol): if isinstance(symbol, TradingPair): symbol = str(symbol) return await super().get_public_ticker(symbol) @retry() async def get_active_position(self): return await super().get_active_position() @retry() async def get_active_orders(self, symbol): if isinstance(symbol, TradingPair): symbol = str(symbol) return await super().get_active_orders(symbol) @retry() async def get_trades(self, symbol, start, end): if isinstance(symbol, TradingPair): symbol = str(symbol) return await super().get_trades(symbol, start, end) @retry() async def post(self, endpoint: str, data=None, params=""): if data is None: data = {} return await super().post(endpoint, data, params) ################################ # NEW METHODS ################################ async def account_movements_between(self, start: int, end: int, ledger, quote: Symbol) -> BalanceGroup: movements = BalanceGroup(quote) # TODO: Parallelize this for entry in filter(lambda x: start <= x[3] <= end, ledger): description: str = entry[8] currency = entry[1] amount = entry[5] time = entry[3] if not description.lower().startswith("deposit"): continue trading_pair = f"t{currency}{quote}" start_time = time - self.DEFAULT_TIME_DELTA end_time = time + self.DEFAULT_TIME_DELTA if currency != str(quote): trades = await self.get_public_trades(symbol=trading_pair, start=start_time, end=end_time) currency_price = average(list(map(lambda x: x[3], trades))) c = Currency(currency, amount, currency_price) else: c = Currency(currency, amount) b = Balance(c, quote) movements.add_balance(b) return movements async def balance_at(self, time: int, ledger, quote: Symbol): bg = BalanceGroup(quote) # TODO: Parallelize this for entry in filter(lambda x: x[3] <= time, ledger): currency = entry[1] amount = entry[6] if currency in bg.currency_names(): continue trading_pair = f"t{currency}{quote}" start_time = time - self.DEFAULT_TIME_DELTA end_time = time + self.DEFAULT_TIME_DELTA if currency != str(quote): trades = await self.get_public_trades(symbol=trading_pair, start=start_time, end=end_time) currency_price = average(list(map(lambda x: x[3], trades))) c = Currency(currency, amount, currency_price) else: c = Currency(currency, amount) b = Balance(c, quote) bg.add_balance(b) return bg # Calculate the average execution price for Trading or rate for Margin funding. async def calculate_execution_price(self, pair: str, amount: float): api_path = "/calc/trade/avg" res = await self.post(api_path, { 'symbol': pair, 'amount': amount }) return res[0] async def get_account_information(self): api_path = "auth/r/info/user" return await self.post(api_path) async def get_current_balances(self, quote: Symbol) -> BalanceGroup: bg: BalanceGroup = BalanceGroup(quote) wallets = await self.get_wallets() for w in wallets: kind = WalletKind.from_str(w.type) if not kind: continue execution_price = await self.calculate_execution_price(f"t{w.currency}{quote}", w.balance) c = Currency(w.currency, w.balance, execution_price) b = Balance(c, quote, kind) bg.add_balance(b) return bg async def get_current_prices(self, symbol: TradingPair) -> (float, float, float): if isinstance(symbol, TradingPair): symbol = str(symbol) tickers = await self.get_public_ticker(symbol) bid_price = tickers[0] ask_price = tickers[2] ticker_price = tickers[6] return bid_price, ask_price, ticker_price async def ledger_history(self, start, end): def chunks(lst): for i in range(len(lst) - 1): yield lst[i:i + 2] def get_timeframes(start, end, increments=10): start = int(start) end = int(end) delta = int((end - start) / increments) return [x for x in range(start, end, delta)] api_path = "auth/r/ledgers/hist" history = [] # TODO: Parallelize this for c in chunks(get_timeframes(start, end)): history.extend(await self.post(api_path, {'start': c[0], 'end': c[1], 'limit': 2500})) history.sort(key=lambda ledger_entry: ledger_entry[3], reverse=True) return history async def maximum_order_amount(self, symbol: TradingPair, direction: Direction, order_type: OrderType = OrderType.EXCHANGE, rate: int = 1): api_path = "auth/calc/order/avail" return await self.post(api_path, {'symbol': str(symbol), 'type': order_type.value, "dir": direction.value, "rate": rate}) async def profit_loss(self, start: int, end: int, ledger, quote: Symbol): if start > end: raise ValueError start_bg = await self.balance_at(start, ledger, quote) end_bg = await self.balance_at(end, ledger, quote) movements_bg = await self.account_movements_between(start, end, ledger, quote) start_quote = start_bg.quote_equivalent() end_quote = end_bg.quote_equivalent() movements_quote = movements_bg.quote_equivalent() profit_loss = end_quote - (start_quote + movements_quote) # print(f"Start: {start}, End: {end}") # print(f"{start_bg} {end_bg} {movements_bg}") profit_loss_percentage = profit_loss / (start_quote + movements_quote) * 100 return profit_loss, profit_loss_percentage