# #!/usr/bin/env python # # import asyncio # import inspect # import shutil # import time # from enum import Enum # from time import sleep # from typing import Dict, List # # import dotenv # import termplotlib # from asciimatics.screen import Screen # from bfxapi import Client, Order # from bfxapi.models.position import Position # from playsound import playsound # from termcolor import colored # # # class Ticker: # def __init__(self, sec) -> None: # self.seconds: int = sec # self.start_time = time.time() # self.current_tick: int = 1 # # # class EventKind(Enum): # NEW_MINIMUM = 1, # NEW_MAXIMUM = 2, # REACHED_LOSS = 3, # REACHED_BREAK_EVEN = 4, # REACHED_MIN_PROFIT = 5, # REACHED_GOOD_PROFIT = 6, # REACHED_MAX_LOSS = 7, # CLOSE_POSITION = 8, # TRAILING_STOP_SET = 9, # TRAILING_STOP_MOVED = 10, # ORDER_SUBMITTED = 11, # # # class Event: # def __init__(self, kind: EventKind, tick: int) -> None: # self.kind: EventKind = kind # self.tick: int = tick # # def __repr__(self) -> str: # return f"{self.kind.name} @ Tick {self.tick}" # # # class State(Enum): # CRITICAL = -1, # LOSS = 0, # BREAK_EVEN = 1, # MINIMUM_PROFIT = 2, # PROFIT = 3 # # def color(self) -> str: # if self == self.LOSS or self == self.CRITICAL: # return "red" # elif self == self.BREAK_EVEN: # return "yellow" # else: # return "green" # # # class Printer: # def __init__(self, screen: Screen): # self.screen: Screen = screen # self.current_line: int = 0 # (self.current_width, self.current_height) = shutil.get_terminal_size() # # def get_current_line(self) -> int: # return self.current_line # # def next(self) -> int: # line = self.current_line # self.current_line += 1 # return line # # def print_next_line(self, text): # for line in text.split("\n"): # self.screen.print_at(line, 0, self.next(), 1) # self.screen.refresh() # # def reset_current_line(self): # self.current_line = 0 # # def set_screen(self, screen: Screen): # self.screen = screen # # def has_screen_resized(self): # return (self.current_width, self.current_height) != shutil.get_terminal_size() # # def to_current_screen_size(self): # (self.current_width, self.current_height) = shutil.get_terminal_size() # # # class Status: # def __init__(self, tick_duration, symbol, printer): # self.ticker: Ticker = Ticker(tick_duration) # self.events: List[Event] = [] # self.symbol = symbol # self.ticks: Dict[int, (float, Position)] = {} # self.current_state: State = State.LOSS # self.printer: Printer = printer # self.stop_percentage: float = None # # async def update(self, position: Position): # self.ticks[self.get_current_tick()] = (await get_current_price(self.symbol), position) # # def wait(self): # sleep(self.ticker.seconds) # self.ticker.current_tick += 1 # # def get_current_tick(self) -> int: # return self.ticker.current_tick # # def last_events(self, n): # return self.events[-n:] # # def last_position(self) -> Position: # return self.ticks[self.ticker.current_tick][1] # # async def add_event(self, event: Event): # self.events.append(event) # await eh.call_event(event, self) # # async def last_price(self) -> float: # return await get_current_price(self.symbol) # # async def set_state(self, state: State): # if self.current_state != state: # event: Event = None # # if state == State.CRITICAL: # event = Event(EventKind.REACHED_MAX_LOSS, # self.get_current_tick()) # elif state == State.LOSS: # event = Event(EventKind.REACHED_LOSS, # self.get_current_tick()) # elif state == State.BREAK_EVEN: # event = Event(EventKind.REACHED_BREAK_EVEN, # self.get_current_tick()) # elif state == State.MINIMUM_PROFIT: # event = Event(EventKind.REACHED_MIN_PROFIT, # self.get_current_tick()) # elif state == State.PROFIT: # event = Event(EventKind.REACHED_GOOD_PROFIT, # self.get_current_tick()) # # self.events.append(event) # await eh.call_event(event, self) # self.current_state = state # # await eh.call_state(self.current_state, self) # # def get_current_state(self) -> State: # return self.current_state # # # class EventHandler: # def __init__(self): # self.event_handlers = {} # self.state_handlers = {} # # async def call_event(self, event: Event, status: Status): # value = event.kind.value # if value in self.event_handlers: # for h in self.event_handlers[value]: # if inspect.iscoroutinefunction(h): # await h(event, status) # else: # h(event, status) # # async def call_state(self, state: State, status: Status): # if state in self.state_handlers: # for h in self.state_handlers[state]: # if inspect.iscoroutinefunction(h): # await h(status) # else: # h(status) # # def on_event(self, kind: EventKind): # value = kind.value # # def registerhandler(handler): # if value in self.event_handlers: # self.event_handlers[value].append(handler) # else: # self.event_handlers[value] = [handler] # return handler # # return registerhandler # # def on_state(self, state: State): # def registerhandler(handler): # if state in self.state_handlers: # self.state_handlers[state].append(handler) # else: # self.state_handlers[state] = [handler] # return handler # # return registerhandler # # # dotenv.load() # API_KEY = dotenv.get('API_KEY', default='') # API_SECRET = dotenv.get('API_SECRET', default='') # # bfx = Client( # API_KEY=API_KEY, # API_SECRET=API_SECRET # ).rest # eh = EventHandler() # # TAKER_FEE = 0.2 # MAKER_FEE = 0.1 # # BREAK_EVEN_PERC = TAKER_FEE # MIN_PROFIT_PERC = 0.65 # GOOD_PROFIT_PERC = MIN_PROFIT_PERC * 2.1 # MAX_LOSS_PERC = -3.75 # OFFER_PERC = 0.01 # # TRAIL_STOP_PERCENTAGES = { # State.MINIMUM_PROFIT: 0.2, # State.PROFIT: 0.1 # } # # # @eh.on_event(EventKind.REACHED_GOOD_PROFIT) # def on_good_profit(event: Event, status: Status): # playsound("sounds/coin.mp3") # # # @eh.on_event(EventKind.REACHED_MIN_PROFIT) # def on_min_profit(event: Event, status: Status): # playsound("sounds/1up.mp3") # # # @eh.on_event(EventKind.REACHED_MAX_LOSS) # def on_critical(event: Event, status: Status): # playsound("sounds/gameover.mp3") # # # @eh.on_state(State.MINIMUM_PROFIT) # def on_state_min_profit(status: Status): # update_stop_percentage(State.MINIMUM_PROFIT, status) # # current_pl_perc = net_pl_percentage( # status.last_position().profit_loss_percentage, TAKER_FEE) # # if current_pl_perc < status.stop_percentage: # status.add_event(Event(EventKind.CLOSE_POSITION, # status.get_current_tick())) # # # @eh.on_state(State.CRITICAL) # async def on_state_critical(status: Status): # await status.add_event(Event(EventKind.CLOSE_POSITION, # status.get_current_tick())) # # # @eh.on_state(State.PROFIT) # def on_state_min_profit(status: Status): # update_stop_percentage(State.PROFIT, status) # # current_pl_perc = net_pl_percentage( # status.last_position().profit_loss_percentage, TAKER_FEE) # # if current_pl_perc < status.stop_percentage: # status.add_event(Event(EventKind.CLOSE_POSITION, # status.get_current_tick())) # # # def update_stop_percentage(state: State, status: Status): # last_position = status.last_position() # last_pl_net_perc = net_pl_percentage( # last_position.profit_loss_percentage, TAKER_FEE) # # # set stop percentage for first time # if not status.stop_percentage: # status.add_event(Event(EventKind.TRAILING_STOP_SET, # status.get_current_tick())) # status.stop_percentage = last_pl_net_perc - \ # TRAIL_STOP_PERCENTAGES[state] # return # # # moving trailing stop # if last_pl_net_perc - TRAIL_STOP_PERCENTAGES[state] > status.stop_percentage: # status.add_event(Event(EventKind.TRAILING_STOP_MOVED, # status.get_current_tick())) # status.stop_percentage = last_pl_net_perc - \ # TRAIL_STOP_PERCENTAGES[state] # # return # # # @eh.on_event(EventKind.CLOSE_POSITION) # async def on_close_position(event: Event, status: Status): # closing_price = await calculate_best_closing_price(status) # amount = status.last_position().amount * -1 # # await bfx.submit_order(status.symbol, closing_price, amount, Order.Type.LIMIT) # await status.add_event(Event(EventKind.ORDER_SUBMITTED, status.get_current_tick())) # # # @eh.on_event(EventKind.ORDER_SUBMITTED) # def on_order_submitted(event: Event, status: Status): # status.printer.print_next_line("ORDER SUBMITTED!") # return # # # async def calculate_best_closing_price(status: Status): # p: Position = status.last_position() # # is_long_pos = p.amount < 0 # # pub_tick = await bfx.get_public_ticker(status.symbol) # # bid_price = pub_tick[0] # ask_price = pub_tick[2] # # if is_long_pos: # closing_price = bid_price * (1 - OFFER_PERC / 100) # else: # closing_price = ask_price * (1 + OFFER_PERC / 100) # # return closing_price # # # def net_pl_percentage(perc: float, reference_fee_perc: float): # return perc - reference_fee_perc # # # async def main(screen: Screen): # min_perc = 999.0 # max_perc = -999.0 # symbol = "tBTCUSD" # # printer = Printer(screen) # status = Status(20, symbol, printer) # balance = await get_usd_balance() # # while True: # positions = [p for p in await bfx.get_active_position() if p.symbol == status.symbol] # orders = await bfx.get_active_orders(symbol) # # current_price = await status.last_price() # # screen.clear() # printer.print_next_line( # "Balance: ${} | Current {} price: {} | Current tick ({} sec): {}".format(colored_float(balance, "white"), # symbol, # colored_float( # current_price, "white", # attrs=["bold"]), # status.ticker.seconds, # status.get_current_tick(), # )) # # if positions: # printer.print_next_line("") # printer.print_next_line("Open {}:".format( # colored("POSITIONS", attrs=["underline"]))) # # for p in [p for p in positions if p.symbol == status.symbol]: # await status.update(p) # # plfees_percentage = net_pl_percentage( # p.profit_loss_percentage, TAKER_FEE) # # if plfees_percentage > GOOD_PROFIT_PERC: # await status.set_state(State.PROFIT) # elif MIN_PROFIT_PERC <= plfees_percentage < GOOD_PROFIT_PERC: # await status.set_state(State.MINIMUM_PROFIT) # elif 0.0 <= plfees_percentage < MIN_PROFIT_PERC: # await status.set_state(State.BREAK_EVEN) # elif MAX_LOSS_PERC < plfees_percentage < 0.0: # await status.set_state(State.LOSS) # else: # await status.set_state(State.CRITICAL) # # status_color = status.get_current_state().color() # # # # # min / max calculations # # # if plfees_percentage > max_perc: # max_perc = plfees_percentage # await status.add_event(Event(EventKind.NEW_MAXIMUM, # status.get_current_tick())) # if plfees_percentage < min_perc: # min_perc = plfees_percentage # await status.add_event(Event(EventKind.NEW_MINIMUM, # status.get_current_tick())) # # min_perc_colored = colored_percentage( # min_perc, "red") if min_perc < 0.0 else colored_percentage(min_perc, "green") # max_perc_colored = colored_percentage( # max_perc, "red") if max_perc < 0.0 else colored_percentage(max_perc, "green") # # # # # current status calculations # # # current_colored_format = "{} ({})".format(colored_percentage(plfees_percentage, status_color), # colored_float(p.profit_loss, status_color)) # # # # # Status bar # # # printer.print_next_line("{:1.5f} {} @ {} | {} | min: {}, MAX: {}".format( # p.amount, # p.symbol, # colored_float(p.base_price, "white", attrs=["underline"]), # current_colored_format, # min_perc_colored, # max_perc_colored)) # # # Separator # printer.print_next_line("") # # if orders: # printer.print_next_line("Open {}:".format( # colored("ORDERS", attrs=["underline"]))) # # print_last_events(status, 10, printer) # plot(status, printer) # # printer.reset_current_line() # status.wait() # # return # # # def colored_percentage(perc, color, **kwargs): # return "{}".format(colored("{:1.2f}%".format(perc), color=color, **kwargs)) # # # def colored_float(num, color, **kwargs): # return "{}".format(colored("{:1.2f}".format(num), color=color, **kwargs)) # # # def print_last_events(status: Status, n: int, printer: Printer): # printer.print_next_line(colored(f"Last {n} events:", attrs=["bold"])) # # for e in status.last_events(n): # printer.print_next_line(f"- {e}") # # # def plot(status: Status, printer: Printer): # if status.ticks: # figure = termplotlib.figure() # # x = range(1, status.get_current_tick() + 1) # y = [x[0] for x in status.ticks.values()] # # figure.plot(x, y, width=printer.screen.width, # height=printer.screen.height - printer.get_current_line()) # # printer.print_next_line(figure.get_string()) # # # async def get_current_price(symbol): # tickers = await bfx.get_public_ticker(symbol) # return tickers[6] # # # async def get_usd_balance(): # balance = 0.0 # # wallets = await bfx.get_wallets() # # for w in wallets: # if w.currency == "USD": # balance += w.balance # else: # current_price = await get_current_price(f"t{w.currency}USD") # balance += current_price * w.balance # # return balance # # # if __name__ == "__main__": # asyncio.run(Screen.wrapper(main)) import asyncio import os import dotenv from bfxbot import BfxBot from bfxbot.currency import Symbol from bfxbot.models import PositionState from strategy import TrailingStopStrategy from flask import Flask, render_template app = Flask(__name__) @app.route('/') def entry(): return render_template('index.html') # dotenv.load_dotenv() # API_KEY = os.getenv("API_KEY") # API_SECRET = os.getenv("API_SECRET") # bot = BfxBot(api_key=API_KEY, api_secret=API_SECRET) # strategy = TrailingStopStrategy() # bot.set_strategy(Symbol.BTC, strategy) # eh = bot.event_handler(Symbol.BTC) # @eh.on_state(PositionState.PROFIT) # def on_min_profit(ss, pw): # print("Minimum profit!") # async def main(): # await bot.start() # while True: # await bot.update() if __name__ == '__main__': # asyncio.run(main()) app.run(debug=True)