import inspect from typing import Dict from bfxapi import Position from bfxbot.bfxwrapper import BfxWrapper from bfxbot.event import Event, EventKind from bfxbot.status import Status, PositionState, Ticker class BfxBot: class EventHandler: def __init__(self): self.event_handlers = {} self.state_handlers = {} async def call_event(self, event: Event, status: Status): value = event.kind.value if value in self.event_handlers: for h in self.event_handlers[value]: if inspect.iscoroutinefunction(h): await h(event, status) else: h(event, status) async def call_state(self, state: PositionState, status: Status): if state in self.state_handlers: for h in self.state_handlers[state]: if inspect.iscoroutinefunction(h): await h(status) else: h(status) def on_event(self, kind: EventKind): value = kind.value def registerhandler(handler): if value in self.event_handlers: self.event_handlers[value].append(handler) else: self.event_handlers[value] = [handler] return handler return registerhandler def on_state(self, state: PositionState): def registerhandler(handler): if state in self.state_handlers: self.state_handlers[state].append(handler) else: self.state_handlers[state] = [handler] return handler return registerhandler def __init__(self, api_key: str, api_secret: str, tick_duration: int = 60): self.bfx: BfxWrapper = BfxWrapper(api_key, api_secret) self.status: Dict[str, Status] = {} self.eh: BfxBot.EventHandler = BfxBot.EventHandler() self.ticker: Ticker = Ticker(tick_duration) await self.__update_status__() async def __update_status__(self): active_positions = await self.bfx.get_active_position() for p in active_positions: if p.symbol not in self.status: self.status[p.symbol] = Status(p.symbol) self.status[p.symbol].positions[self.ticker.current_tick].append(p) for symbol in self.status.keys(): active_orders = await self.bfx.get_active_orders(symbol) for o in active_orders: if symbol not in self.status: self.status[symbol] = Status(symbol) self.status[symbol].orders[self.ticker.current_tick].append(o) def __trigger__events(self): return async def update(self): self.ticker.inc() await self.__update_status__()