from typing import List import sympy.abc from bfxapi import Position from sympy import Point, solve from bfxbot.models import Strategy, PositionState, SymbolStatus, Event, EventKind, EventMetadata, PositionWrapper, \ TAKER_FEE from bfxbot.utils import net_pl_percentage class SquaredTrailingStop: def __init__(self, p_min: Point, p_max: Point): a = sympy.abc.a b = sympy.abc.b c = sympy.abc.c self.p_min = p_min self.p_max = p_max e1 = 2 * a * (p_max.x + b) e2 = a * (p_min.x + b) ** 2 + c - p_min.y e3 = a * (p_max.x + b) ** 2 + c - p_max.y s = solve([e1, e2, e3])[0] self.a, self.b, self.c = s[a], s[b], s[c] def y(self, x): def inter_y(x): return self.a * (x + self.b) ** 2 + self.c if x < self.p_min.x: return self.p_min.y elif x > self.p_max.x: return self.p_max.y else: return inter_y(x) def profit(self, x): if x < self.p_min.x: return 0 return x - self.y(x) class TrailingStopStrategy(Strategy): BREAK_EVEN_PERC = TAKER_FEE MIN_PROFIT_PERC = BREAK_EVEN_PERC + 0.3 GOOD_PROFIT_PERC = MIN_PROFIT_PERC * 2.5 MAX_LOSS_PERC = -3.75 TRAILING_STOP = SquaredTrailingStop(Point(MIN_PROFIT_PERC, MIN_PROFIT_PERC / 3 * 2), Point(GOOD_PROFIT_PERC, 0.1)) def __init__(self): self.stop_percentage: float = None def position_on_new_tick(self, position: Position, ss: SymbolStatus) -> (PositionState, List[Event]): events = [] pl_perc = net_pl_percentage(position.profit_loss_percentage, TAKER_FEE) prev = ss.previous_pw(position.id) event_metadata = EventMetadata(position_id=position.id) if pl_perc > self.GOOD_PROFIT_PERC: state = PositionState.PROFIT elif self.MIN_PROFIT_PERC <= pl_perc < self.GOOD_PROFIT_PERC: state = PositionState.MINIMUM_PROFIT elif 0.0 <= pl_perc < self.MIN_PROFIT_PERC: state = PositionState.BREAK_EVEN elif self.MAX_LOSS_PERC < pl_perc < 0.0: state = PositionState.LOSS else: state = PositionState.CRITICAL pw = PositionWrapper(position, state=state, net_profit_loss=position.profit_loss, net_profit_loss_percentage=pl_perc) if not prev or prev.state() == state: return pw, events if state == PositionState.PROFIT: events.append(Event(EventKind.REACHED_GOOD_PROFIT, ss.current_tick, event_metadata)) elif state == PositionState.MINIMUM_PROFIT: events.append(Event(EventKind.REACHED_MIN_PROFIT, ss.current_tick, event_metadata)) elif state == PositionState.BREAK_EVEN: events.append(Event(EventKind.REACHED_BREAK_EVEN, ss.current_tick, event_metadata)) elif state == PositionState.LOSS: events.append(Event(EventKind.REACHED_LOSS, ss.current_tick, event_metadata)) else: events.append(Event(EventKind.REACHED_MAX_LOSS, ss.current_tick, event_metadata)) events.append(Event(EventKind.CLOSE_POSITION, ss.current_tick, event_metadata)) return pw, events async def update_stop_percentage(self, pw: PositionWrapper, ss: SymbolStatus): current_pl_perc = pw.net_profit_loss_percentage() # set stop percentage for first time if not self.stop_percentage: await ss.add_event(Event(EventKind.TRAILING_STOP_SET, ss.current_tick)) self.stop_percentage = current_pl_perc - self.TRAILING_STOP.y(current_pl_perc) return # moving trailing stop if current_pl_perc - self.TRAILING_STOP.y(pw.net_profit_loss_percentage()) > self.stop_percentage: await ss.add_event(Event(EventKind.TRAILING_STOP_MOVED, ss.current_tick)) self.stop_percentage = current_pl_perc - self.TRAILING_STOP.y(current_pl_perc) return