core/strategy.py
2020-12-17 09:30:48 +00:00

124 lines
4.7 KiB
Python

from typing import List, Dict
import sympy.abc
from bfxapi import Position
from sympy import Point, solve
from bfxbot.models import Strategy, PositionState, SymbolStatus, Event, EventKind, EventMetadata, PositionWrapper, \
TAKER_FEE
from bfxbot.utils import net_pl_percentage
class SquaredTrailingStop:
def __init__(self, p_min: Point, p_max: Point):
a = sympy.abc.a
b = sympy.abc.b
c = sympy.abc.c
self.p_min = p_min
self.p_max = p_max
e1 = 2 * a * (p_max.x + b)
e2 = a * (p_min.x + b) ** 2 + c - p_min.y
e3 = a * (p_max.x + b) ** 2 + c - p_max.y
s = solve([e1, e2, e3])[0]
self.a, self.b, self.c = s[a], s[b], s[c]
def y(self, x):
def inter_y(x):
return self.a * (x + self.b) ** 2 + self.c
if x < self.p_min.x:
return self.p_min.y
elif x > self.p_max.x:
return self.p_max.y
else:
return inter_y(x)
def profit(self, x):
if x < self.p_min.x:
return 0
return x - self.y(x)
class TrailingStopStrategy(Strategy):
BREAK_EVEN_PERC = TAKER_FEE
MIN_PROFIT_PERC = BREAK_EVEN_PERC + 0.3
GOOD_PROFIT_PERC = MIN_PROFIT_PERC * 2.5
MAX_LOSS_PERC = -4.0
TRAILING_STOP = SquaredTrailingStop(Point(MIN_PROFIT_PERC, MIN_PROFIT_PERC / 3 * 2), Point(GOOD_PROFIT_PERC, 0.1))
def __init__(self):
# position_id : stop percentage
self.stop_percentage: Dict[int, float] = {}
def position_on_new_tick(self, current_position: Position, ss: SymbolStatus) -> (PositionState, List[Event]):
events = []
pl_perc = net_pl_percentage(current_position.profit_loss_percentage, TAKER_FEE)
prev = ss.previous_pw(current_position.id)
event_metadata = EventMetadata(position_id=current_position.id)
if pl_perc > self.GOOD_PROFIT_PERC:
state = PositionState.PROFIT
elif self.MIN_PROFIT_PERC <= pl_perc < self.GOOD_PROFIT_PERC:
state = PositionState.MINIMUM_PROFIT
elif 0.0 <= pl_perc < self.MIN_PROFIT_PERC:
state = PositionState.BREAK_EVEN
elif self.MAX_LOSS_PERC < pl_perc < 0.0:
state = PositionState.LOSS
else:
events.append(Event(EventKind.CLOSE_POSITION, ss.current_tick, event_metadata))
state = PositionState.CRITICAL
pw = PositionWrapper(current_position, state=state, net_profit_loss=current_position.profit_loss,
net_profit_loss_percentage=pl_perc)
if not prev or prev.state() == state:
return pw, events
if state == PositionState.PROFIT:
events.append(Event(EventKind.REACHED_GOOD_PROFIT, ss.current_tick, event_metadata))
elif state == PositionState.MINIMUM_PROFIT:
events.append(Event(EventKind.REACHED_MIN_PROFIT, ss.current_tick, event_metadata))
elif state == PositionState.BREAK_EVEN:
events.append(Event(EventKind.REACHED_BREAK_EVEN, ss.current_tick, event_metadata))
elif state == PositionState.LOSS:
events.append(Event(EventKind.REACHED_LOSS, ss.current_tick, event_metadata))
else:
events.append(Event(EventKind.REACHED_MAX_LOSS, ss.current_tick, event_metadata))
events.append(Event(EventKind.CLOSE_POSITION, ss.current_tick, event_metadata))
return pw, events
async def update_stop_percentage(self, pw: PositionWrapper, ss: SymbolStatus):
current_pl_perc = pw.net_profit_loss_percentage()
pid = pw.position.id
event_metadata = EventMetadata(position_id=pw.position.id)
# if trailing stop not set for this position and state is not profit (we should not set it)
if pid not in self.stop_percentage and pw.state() not in [PositionState.MINIMUM_PROFIT,
PositionState.PROFIT]:
return
# set stop percentage for first time only if in profit
if pid not in self.stop_percentage:
await ss.add_event(Event(EventKind.TRAILING_STOP_SET, ss.current_tick, event_metadata))
self.stop_percentage[pid] = current_pl_perc - self.TRAILING_STOP.y(current_pl_perc)
return
# moving trailing stop
if current_pl_perc - self.TRAILING_STOP.y(current_pl_perc) > self.stop_percentage[pid]:
await ss.add_event(Event(EventKind.TRAILING_STOP_MOVED, ss.current_tick, event_metadata))
self.stop_percentage[pid] = current_pl_perc - self.TRAILING_STOP.y(current_pl_perc)
# close position if current P/L below stop percentage
if current_pl_perc < self.stop_percentage[pid]:
await ss.add_event(Event(EventKind.CLOSE_POSITION, ss.current_tick, event_metadata))
return