changed visibility of bfxbot attributes. removed pid from event class. added while loop to avoid race condition on socket.io first_connect event

This commit is contained in:
Giulio De Pasquale 2020-12-15 15:52:57 +00:00
parent f930a849f0
commit a7b1d05029
4 changed files with 53 additions and 38 deletions

View File

@ -1,5 +1,6 @@
from time import sleep from time import sleep
from typing import Dict, List from typing import Dict, List
from bfxbot.bfxwrapper import BfxWrapper from bfxbot.bfxwrapper import BfxWrapper
from bfxbot.currency import Symbol from bfxbot.currency import Symbol
from bfxbot.models import SymbolStatus, Ticker, EventHandler, Strategy, Event, EventKind from bfxbot.models import SymbolStatus, Ticker, EventHandler, Strategy, Event, EventKind
@ -15,9 +16,9 @@ class BfxBot:
print("API_SECRET is not set!") print("API_SECRET is not set!")
raise ValueError raise ValueError
self.bfx: BfxWrapper = BfxWrapper(api_key, api_secret) self.__bfx: BfxWrapper = BfxWrapper(api_key, api_secret)
self.status: Dict[Symbol, SymbolStatus] = {} self.__ticker: Ticker = Ticker(tick_duration)
self.ticker: Ticker = Ticker(tick_duration) self.__status: Dict[Symbol, SymbolStatus] = {}
if isinstance(symbols, Symbol): if isinstance(symbols, Symbol):
symbols = [symbols] symbols = [symbols]
@ -26,57 +27,57 @@ class BfxBot:
# init symbol statuses # init symbol statuses
for s in self.symbols: for s in self.symbols:
self.status[s] = SymbolStatus(s) self.__status[s] = SymbolStatus(s)
async def __update_status__(self): async def __update_status__(self):
active_positions = await self.bfx.get_active_position() active_positions = await self.__bfx.get_active_position()
for symbol in self.status: for symbol in self.__status:
# updating tick # updating tick
self.status[symbol].current_tick = self.ticker.current_tick self.__status[symbol].current_tick = self.__ticker.current_tick
# updating last price # updating last price
last_price = await self.bfx.get_current_prices(symbol) last_price = await self.__bfx.get_current_prices(symbol)
last_price = last_price[0] last_price = last_price[0]
self.status[symbol].set_price(self.ticker.current_tick, last_price) self.__status[symbol].set_tick_price(self.__ticker.current_tick, last_price)
# updating positions # updating positions
for p in [x for x in active_positions if x.symbol == str(symbol)]: symbol_positions = [x for x in active_positions if x.symbol == str(symbol)]
await self.status[Symbol.from_str(p.symbol)].add_position(p) for p in symbol_positions:
await self.__status[Symbol.from_str(p.symbol)].add_position(p)
# # updating orders # updating orders
# active_orders = await self.bfx.get_active_orders(symbol) active_orders = await self.__bfx.get_active_orders(str(symbol))
#
# for o in active_orders:
# self.status[symbol].add_order(o)
# emitting event for o in active_orders:
await self.status[symbol].__add_event__(Event(EventKind.NEW_TICK, 0, self.ticker.current_tick)) self.__status[symbol].add_order(o)
# emitting new tick event
await self.__status[symbol].__add_event__(Event(EventKind.NEW_TICK, self.__ticker.current_tick))
def event_handler(self, symbol) -> EventHandler: def symbol_event_handler(self, symbol) -> EventHandler:
if symbol not in self.status: if symbol not in self.__status:
return None return None
return self.status[symbol].eh return self.__status[symbol].eh
def status(self, symbol: Symbol) -> SymbolStatus: def symbol_status(self, symbol: Symbol) -> SymbolStatus:
if symbol not in self.status: if symbol not in self.__status:
return None return None
return self.status[symbol] return self.__status[symbol]
async def start(self): async def start(self):
await self.__update_status__() await self.__update_status__()
async def update(self): async def update(self):
self.ticker.inc() sleep(self.__ticker.seconds)
sleep(self.ticker.seconds) self.__ticker.inc()
await self.__update_status__() await self.__update_status__()
def set_strategy(self, symbol, strategy: Strategy): def set_strategy(self, symbol, strategy: Strategy):
if symbol in self.status: if symbol in self.__status:
self.status[symbol].strategy = strategy self.__status[symbol].strategy = strategy
else: else:
self.status[symbol] = SymbolStatus(symbol, strategy) self.__status[symbol] = SymbolStatus(symbol, strategy)

View File

@ -58,14 +58,12 @@ class Ticker:
class Event: class Event:
def __init__(self, kind: EventKind, pid: int, tick: int) -> None: def __init__(self, kind: EventKind, tick: int) -> None:
self.kind: EventKind = kind self.kind: EventKind = kind
self.tick: int = tick self.tick: int = tick
# position ID
self.pid: int = pid
def __repr__(self) -> str: def __repr__(self) -> str:
return f"[{self.pid}]: {self.kind.name} @ Tick {self.tick}" return f"{self.kind.name} @ Tick {self.tick}"
class PositionWrapper: class PositionWrapper:
@ -151,9 +149,10 @@ class SymbolStatus:
pw.set_state(state) pw.set_state(state)
await self.eh.call_state(self, pw) await self.eh.call_state(self, pw)
def set_price(self, tick, price): def set_tick_price(self, tick, price):
self.prices[tick] = price self.prices[tick] = price
class Strategy: class Strategy:
""" """
Defines new position state and events after tick. Defines new position state and events after tick.
@ -236,10 +235,12 @@ class EventHandler:
def registerhandle(handler): def registerhandle(handler):
self.any_events.append(handler) self.any_events.append(handler)
return handler return handler
return registerhandle return registerhandle
def on_any_state(self): def on_any_state(self):
def registerhandle(handler): def registerhandle(handler):
self.any_state.append(handler) self.any_state.append(handler)
return handler return handler
return registerhandle return registerhandle

17
main.py
View File

@ -3,6 +3,7 @@
import asyncio import asyncio
import os import os
import threading import threading
from time import sleep
from typing import List from typing import List
import dotenv import dotenv
@ -34,7 +35,7 @@ socketio = SocketIO(app, async_mode="threading")
bot = BfxBot(api_key=API_KEY, api_secret=API_SECRET, bot = BfxBot(api_key=API_KEY, api_secret=API_SECRET,
symbols=[Symbol.BTC], tick_duration=20) symbols=[Symbol.BTC], tick_duration=20)
bot.set_strategy(Symbol.BTC, TrailingStopStrategy()) bot.set_strategy(Symbol.BTC, TrailingStopStrategy())
btc_eh = bot.event_handler(Symbol.BTC) btc_eh = bot.symbol_event_handler(Symbol.BTC)
# initializing and starting bot on other thread # initializing and starting bot on other thread
threading.Thread(target=lambda: asyncio.run(bot_loop())).start() threading.Thread(target=lambda: asyncio.run(bot_loop())).start()
@ -62,9 +63,19 @@ def on_message(message: dict):
@socketio.on('connect') @socketio.on('connect')
def on_connect(): def on_connect():
# sleeping on exception to avoid race condition
ticks, prices = [], []
while not ticks or not prices:
try:
ticks = bot.symbol_status(Symbol.BTC).all_ticks()
prices = bot.symbol_status(Symbol.BTC).all_prices()
except KeyError:
sleep(1)
socketio.emit("first_connect", socketio.emit("first_connect",
{"ticks": bot.status[Symbol.BTC].all_ticks(), {"ticks": ticks,
"prices": bot.status[Symbol.BTC].all_prices()}) "prices": prices})
################################### ###################################

View File

@ -1,2 +1,4 @@
python-dotenv~=0.15.0 python-dotenv~=0.15.0
sympy~=1.7 sympy~=1.7
asyncio~=3.4.3
Flask~=1.1.2