from typing import List, Dict import sympy.abc from bfxapi import Position from sympy import Point, solve from bfxbot.models import Strategy, PositionState, SymbolStatus, Event, EventKind, EventMetadata, PositionWrapper, \ TAKER_FEE from bfxbot.utils import net_pl_percentage class SquaredTrailingStop: def __init__(self, p_min: Point, p_max: Point): a = sympy.abc.a b = sympy.abc.b c = sympy.abc.c self.p_min = p_min self.p_max = p_max e1 = 2 * a * (p_max.x + b) e2 = a * (p_min.x + b) ** 2 + c - p_min.y e3 = a * (p_max.x + b) ** 2 + c - p_max.y s = solve([e1, e2, e3])[0] self.a, self.b, self.c = s[a], s[b], s[c] def y(self, x): def inter_y(x): return self.a * (x + self.b) ** 2 + self.c if x < self.p_min.x: return self.p_min.y elif x > self.p_max.x: return self.p_max.y else: return inter_y(x) def profit(self, x): if x < self.p_min.x: return 0 return x - self.y(x) class TrailingStopStrategy(Strategy): BREAK_EVEN_PERC = TAKER_FEE MIN_PROFIT_PERC = BREAK_EVEN_PERC + 0.3 GOOD_PROFIT_PERC = MIN_PROFIT_PERC * 2.5 MAX_LOSS_PERC = -4.0 TRAILING_STOP = SquaredTrailingStop(Point(MIN_PROFIT_PERC, MIN_PROFIT_PERC / 3 * 2), Point(GOOD_PROFIT_PERC, 0.1)) def __init__(self): # position_id : stop percentage self.stop_percentage: Dict[int, float] = {} def position_on_new_tick(self, current_position: Position, ss: SymbolStatus) -> (PositionState, List[Event]): events = [] pl_perc = net_pl_percentage(current_position.profit_loss_percentage, TAKER_FEE) prev = ss.previous_pw(current_position.id) event_metadata = EventMetadata(position_id=current_position.id) if pl_perc > self.GOOD_PROFIT_PERC: state = PositionState.PROFIT elif self.MIN_PROFIT_PERC <= pl_perc < self.GOOD_PROFIT_PERC: state = PositionState.MINIMUM_PROFIT elif 0.0 <= pl_perc < self.MIN_PROFIT_PERC: state = PositionState.BREAK_EVEN elif self.MAX_LOSS_PERC < pl_perc < 0.0: state = PositionState.LOSS else: events.append(Event(EventKind.CLOSE_POSITION, ss.current_tick, event_metadata)) state = PositionState.CRITICAL pw = PositionWrapper(current_position, state=state, net_profit_loss=current_position.profit_loss, net_profit_loss_percentage=pl_perc) if not prev or prev.state() == state: return pw, events if state == PositionState.PROFIT: events.append(Event(EventKind.REACHED_GOOD_PROFIT, ss.current_tick, event_metadata)) elif state == PositionState.MINIMUM_PROFIT: events.append(Event(EventKind.REACHED_MIN_PROFIT, ss.current_tick, event_metadata)) elif state == PositionState.BREAK_EVEN: events.append(Event(EventKind.REACHED_BREAK_EVEN, ss.current_tick, event_metadata)) elif state == PositionState.LOSS: events.append(Event(EventKind.REACHED_LOSS, ss.current_tick, event_metadata)) else: events.append(Event(EventKind.REACHED_MAX_LOSS, ss.current_tick, event_metadata)) events.append(Event(EventKind.CLOSE_POSITION, ss.current_tick, event_metadata)) return pw, events async def update_stop_percentage(self, pw: PositionWrapper, ss: SymbolStatus): current_pl_perc = pw.net_profit_loss_percentage() pid = pw.position.id event_metadata = EventMetadata(position_id=pw.position.id) # if trailing stop not set for this position and state is not profit (we should not set it) if pid not in self.stop_percentage and pw.state() not in [PositionState.MINIMUM_PROFIT, PositionState.PROFIT]: return # set stop percentage for first time only if in profit if pid not in self.stop_percentage: await ss.add_event(Event(EventKind.TRAILING_STOP_SET, ss.current_tick, event_metadata)) self.stop_percentage[pid] = current_pl_perc - self.TRAILING_STOP.y(current_pl_perc) return # moving trailing stop if current_pl_perc - self.TRAILING_STOP.y(current_pl_perc) > self.stop_percentage[pid]: await ss.add_event(Event(EventKind.TRAILING_STOP_MOVED, ss.current_tick, event_metadata)) self.stop_percentage[pid] = current_pl_perc - self.TRAILING_STOP.y(current_pl_perc) # close position if current P/L below stop percentage if current_pl_perc < self.stop_percentage[pid]: await ss.add_event(Event(EventKind.CLOSE_POSITION, ss.current_tick, event_metadata)) return