from time import sleep from typing import Dict from bfxbot.bfxwrapper import BfxWrapper from bfxbot.currency import Symbol from bfxbot.models import SymbolStatus, Ticker, EventHandler, Strategy class BfxBot: def __init__(self, api_key: str, api_secret: str, tick_duration: int = 1): self.bfx: BfxWrapper = BfxWrapper(api_key, api_secret) self.status: Dict[Symbol, SymbolStatus] = {} self.ticker: Ticker = Ticker(tick_duration) async def __update_status__(self): active_positions = await self.bfx.get_active_position() for p in active_positions: symbol = Symbol.from_str(p.symbol) if symbol not in self.status: self.status[symbol] = SymbolStatus(symbol) await self.status[symbol].add_position(p) for symbol in self.status.keys(): active_orders = await self.bfx.get_active_orders(symbol) for o in active_orders: if symbol not in self.status: self.status[symbol] = SymbolStatus(symbol) self.status[symbol].add_order(o) def event_handler(self, symbol) -> EventHandler: if symbol not in self.status: return None return self.status[symbol].eh def status(self, symbol: Symbol) -> SymbolStatus: if symbol not in self.status: return None return self.status[symbol] async def start(self): await self.__update_status__() async def update(self): self.ticker.inc() sleep(self.ticker.seconds) await self.__update_status__() def set_strategy(self, symbol, strategy: Strategy): if symbol in self.status: self.status[symbol].strategy = strategy else: self.status[symbol] = SymbolStatus(symbol, strategy)