core/main.py
Giulio De Pasquale 7fafdc6293 removed old code
2020-12-15 15:16:16 +00:00

88 lines
2.3 KiB
Python
Executable File

# #!/usr/bin/env python
import asyncio
import os
import threading
from typing import List
import dotenv
from flask import Flask, render_template
from flask_socketio import SocketIO
from bfxbot import BfxBot
from bfxbot.currency import Symbol
from bfxbot.models import PositionWrapper, SymbolStatus, Event, EventKind
from strategy import TrailingStopStrategy
app = Flask(__name__)
socketio = SocketIO(app, async_mode="threading")
dotenv.load_dotenv()
loop = asyncio.new_event_loop()
bot: BfxBot = None
@app.route('/')
def entry():
return render_template('index.html')
@socketio.on("close")
def on_message(message: dict):
position_id = message['id']
print("I would close position {}".format(position_id))
@socketio.on('connect')
def start_bot():
global bot
asyncio.set_event_loop(loop)
API_KEY = os.getenv("API_KEY")
API_SECRET = os.getenv("API_SECRET")
if not bot:
bot = BfxBot(api_key=API_KEY, api_secret=API_SECRET,
symbols=[Symbol.BTC], tick_duration=20)
strategy = TrailingStopStrategy()
bot.set_strategy(Symbol.BTC, strategy)
threading.Thread(target=lambda: asyncio.run(bot_loop())).start()
print("Bot started.")
socketio.emit("first_connect",
{"ticks": bot.status[Symbol.BTC].all_ticks(),
"prices": bot.status[Symbol.BTC].all_prices()})
async def bot_loop():
global bot
eh = bot.event_handler(Symbol.BTC)
@eh.on_event(EventKind.NEW_TICK)
def on_new_tick(event: Event, status: SymbolStatus):
tick = event.tick
price = status.prices[event.tick]
positions: List[PositionWrapper] = status.positions[event.tick] if event.tick in status.positions else []
# wrapping into json
positions = list(
map(lambda x: {"id": x.position.id, "symbol": x.position.symbol, "profit_loss": x.position.profit_loss,
"profit_loss_percentage": x.position.profit_loss_percentage}, positions))
socketio.emit("new_tick", {"tick": tick,
"price": price,
"positions": positions})
await bot.start()
while True:
await bot.update()
if __name__ == '__main__':
socketio.run(app, debug=True)