core/bfxbot/bfxbot.py
2020-12-24 13:47:33 +00:00

152 lines
5.1 KiB
Python

import asyncio
import time
from typing import Dict, List, Optional, Tuple
from bfxapi import Order
from bfxbot.bfxwrapper import BfxWrapper
from bfxbot.currency import TradingPair, Symbol
from bfxbot.models import SymbolStatus, Ticker, EventHandler, Strategy, Event, EventKind, OFFER_PERC, PositionWrapper
class BfxBot:
def __init__(self, api_key: str, api_secret: str, symbols: List[TradingPair], quote: Symbol,
tick_duration: int = 1):
if api_key is None:
print("API_KEY is not set!")
raise ValueError
if api_secret is None:
print("API_SECRET is not set!")
raise ValueError
self.__bfx: BfxWrapper = BfxWrapper(api_key, api_secret)
self.__ticker: Ticker = Ticker(tick_duration)
self.__status: Dict[TradingPair, SymbolStatus] = {}
self.__quote: Symbol = quote
self.__account_info = None
self.__ledger = None
if isinstance(symbols, TradingPair):
symbols = [symbols]
self.symbols: List[TradingPair] = symbols
# init symbol statuses
for s in self.symbols:
self.__status[s] = SymbolStatus(s)
def __position_wrapper_from_id(self, position_id) -> Tuple[Optional[PositionWrapper], Optional[SymbolStatus]]:
for s in self.__status.values():
pw = s.active_position_wrapper_from_id(position_id)
if pw:
return pw, s
return None, None
async def __update_status__(self):
active_positions = await self.__bfx.get_active_position()
for symbol in self.__status:
# updating tick
self.__status[symbol].__init_tick__(self.__ticker.current_tick)
# updating last price
last_price = await self.__bfx.get_current_prices(symbol)
last_price = last_price[0]
self.__status[symbol].set_tick_price(self.__ticker.current_tick, last_price)
# updating positions
symbol_positions = [x for x in active_positions if x.symbol == str(symbol)]
for p in symbol_positions:
await self.__status[TradingPair.from_str(p.symbol)].add_position(p)
# updating orders
active_orders = await self.__bfx.get_active_orders(symbol)
for o in active_orders:
self.__status[symbol].add_order(o)
# emitting new tick event
# TODO: handle _on_new_tick() from Strategy
await self.__status[symbol].add_event(Event(EventKind.NEW_TICK, self.__ticker.current_tick))
async def best_position_closing_price(self, position_id: int) -> Optional[float]:
pw, _ = self.__position_wrapper_from_id(position_id)
if not pw:
return None
is_long_pos = pw.position.amount < 0
pub_tick = await self.__bfx.get_public_ticker(pw.position.symbol)
bid_price = pub_tick[0]
ask_price = pub_tick[2]
if is_long_pos:
closing_price = bid_price * (1 - OFFER_PERC / 100)
else:
closing_price = ask_price * (1 + OFFER_PERC / 100)
return closing_price
def close_order(self, symbol: TradingPair, order_id: int):
print(f"I would have closed order {order_id} for {symbol}")
async def close_position(self, position_id: int):
pw, ss = self.__position_wrapper_from_id(position_id)
if not pw:
print("Could not find open position!")
return
closing_price = await self.best_position_closing_price(pw.position.id)
amount = pw.position.amount * -1
open_orders = await self.__bfx.get_active_orders(pw.position.symbol)
if not open_orders:
await self.__bfx.submit_order(pw.position.symbol, closing_price, amount, Order.Type.LIMIT)
await ss.add_event(Event(EventKind.ORDER_SUBMITTED, ss.current_tick))
async def get_balances(self):
return await self.__bfx.get_current_balances(self.__quote)
async def get_profit_loss(self, start: int, end: int):
return await self.__bfx.profit_loss(start, end, self.__ledger, self.__quote)
def set_strategy(self, symbol, strategy: Strategy):
if symbol in self.__status:
self.__status[symbol].strategy = strategy
else:
self.__status[symbol] = SymbolStatus(symbol, strategy)
async def start(self):
self.__account_info = self.__bfx.get_account_information()
self.__ledger = await self.__bfx.ledger_history(0, time.time() * 1000)
await self.__update_status__()
def symbol_event_handler(self, symbol) -> Optional[EventHandler]:
if symbol not in self.__status:
return None
return self.__status[symbol].eh
def symbol_status(self, symbol: TradingPair) -> Optional[SymbolStatus]:
if symbol not in self.__status:
return None
return self.__status[symbol]
async def update(self):
await asyncio.sleep(self.__ticker.seconds)
self.__ticker.inc()
await self.__update_status__()
async def __update_ledger(self):
self.__ledger = await self.__bfx.ledger_history(0, time.time() * 1000)