from time import sleep from typing import Dict, List from bfxbot.bfxwrapper import BfxWrapper from bfxbot.currency import Symbol from bfxbot.models import SymbolStatus, Ticker, EventHandler, Strategy, Event, EventKind class BfxBot: def __init__(self, api_key: str, api_secret: str, symbols: List[Symbol], tick_duration: int = 1, ): if api_key is None: print("API_KEY is not set!") raise ValueError if api_secret is None: print("API_SECRET is not set!") raise ValueError self.bfx: BfxWrapper = BfxWrapper(api_key, api_secret) self.status: Dict[Symbol, SymbolStatus] = {} self.ticker: Ticker = Ticker(tick_duration) if isinstance(symbols, Symbol): symbols = [symbols] self.symbols: List[Symbol] = symbols # init symbol statuses for s in self.symbols: self.status[s] = SymbolStatus(s) async def __update_status__(self): active_positions = await self.bfx.get_active_position() for symbol in self.status: # updating tick self.status[symbol].current_tick = self.ticker.current_tick # updating last price last_price = await self.bfx.get_current_prices(symbol) last_price = last_price[0] self.status[symbol].set_price(self.ticker.current_tick, last_price) # updating positions for p in [x for x in active_positions if x.symbol == str(symbol)]: await self.status[Symbol.from_str(p.symbol)].add_position(p) # # updating orders # active_orders = await self.bfx.get_active_orders(symbol) # # for o in active_orders: # self.status[symbol].add_order(o) # emitting event await self.status[symbol].__add_event__(Event(EventKind.NEW_TICK, 0, self.ticker.current_tick)) def event_handler(self, symbol) -> EventHandler: if symbol not in self.status: return None return self.status[symbol].eh def status(self, symbol: Symbol) -> SymbolStatus: if symbol not in self.status: return None return self.status[symbol] async def start(self): await self.__update_status__() async def update(self): self.ticker.inc() sleep(self.ticker.seconds) await self.__update_status__() def set_strategy(self, symbol, strategy: Strategy): if symbol in self.status: self.status[symbol].strategy = strategy else: self.status[symbol] = SymbolStatus(symbol, strategy)