core/bfxbot/bfxbot.py
2020-12-07 12:27:32 +00:00

68 lines
2.0 KiB
Python

from time import sleep
from typing import Dict
from bfxbot.bfxwrapper import BfxWrapper
from bfxbot.currency import Symbol
from bfxbot.models import SymbolStatus, Ticker, EventHandler, Strategy
class BfxBot:
def __init__(self, api_key: str, api_secret: str, tick_duration: int = 1):
if api_key is None:
print("API_KEY is not set!")
raise ValueError
if api_secret is None:
print("API_SECRET is not set!")
raise ValueError
self.bfx: BfxWrapper = BfxWrapper(api_key, api_secret)
self.status: Dict[Symbol, SymbolStatus] = {}
self.ticker: Ticker = Ticker(tick_duration)
async def __update_status__(self):
active_positions = await self.bfx.get_active_position()
for p in active_positions:
symbol = Symbol.from_str(p.symbol)
if symbol not in self.status:
self.status[symbol] = SymbolStatus(symbol)
await self.status[symbol].add_position(p)
for symbol in self.status.keys():
active_orders = await self.bfx.get_active_orders(symbol)
for o in active_orders:
if symbol not in self.status:
self.status[symbol] = SymbolStatus(symbol)
self.status[symbol].add_order(o)
def event_handler(self, symbol) -> EventHandler:
if symbol not in self.status:
return None
return self.status[symbol].eh
def status(self, symbol: Symbol) -> SymbolStatus:
if symbol not in self.status:
return None
return self.status[symbol]
async def start(self):
await self.__update_status__()
async def update(self):
self.ticker.inc()
sleep(self.ticker.seconds)
await self.__update_status__()
def set_strategy(self, symbol, strategy: Strategy):
if symbol in self.status:
self.status[symbol].strategy = strategy
else:
self.status[symbol] = SymbolStatus(symbol, strategy)