broadcasting messages and events. trailing stop alpha version

This commit is contained in:
Giulio De Pasquale 2021-01-24 19:36:25 +00:00
parent 4999cdc498
commit 12c9918d2c
4 changed files with 183 additions and 60 deletions

47
rustybot/Cargo.lock generated
View File

@ -680,6 +680,28 @@ version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "merge"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10bbef93abb1da61525bbc45eeaff6473a41907d19f8f9aa5168d214e10693e9"
dependencies = [
"merge_derive",
"num-traits",
]
[[package]]
name = "merge_derive"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "209d075476da2e63b4b29e72a2ef627b840589588e71400a25e3565c4f849d07"
dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "mime" name = "mime"
version = "0.3.16" version = "0.3.16"
@ -983,6 +1005,30 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.24" version = "1.0.24"
@ -1144,6 +1190,7 @@ dependencies = [
"float-cmp", "float-cmp",
"futures-util", "futures-util",
"log 0.4.11", "log 0.4.11",
"merge",
"regex", "regex",
"tokio 0.2.24", "tokio 0.2.24",
"tokio-tungstenite", "tokio-tungstenite",

View File

@ -19,3 +19,4 @@ fern = {version = "0.6", features = ["colored"]}
chrono = "0.4" chrono = "0.4"
byteorder = "1" byteorder = "1"
float-cmp = "0.8" float-cmp = "0.8"
merge = "0.1"

View File

@ -1,14 +1,13 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::ops::Neg; use std::ops::{Deref, Neg};
use bitfinex::ticker::TradingPairTicker; use bitfinex::ticker::TradingPairTicker;
use futures_util::stream::FuturesUnordered; use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use log::{debug, error, info}; use log::{debug, error, info};
use tokio::signal::unix::Signal; use tokio::signal::unix::Signal;
use tokio::sync::mpsc::channel; use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::oneshot; use tokio::sync::{oneshot, RwLock};
use crate::connectors::{Client, ExchangeDetails}; use crate::connectors::{Client, ExchangeDetails};
use crate::currency::SymbolPair; use crate::currency::SymbolPair;
@ -18,6 +17,8 @@ use crate::models::{
}; };
use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy, TrailingStop}; use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy, TrailingStop};
use crate::BoxError; use crate::BoxError;
use merge::Merge;
use tokio::stream::StreamExt;
pub type OptionUpdate = (Option<Vec<Event>>, Option<Vec<Message>>); pub type OptionUpdate = (Option<Vec<Event>>, Option<Vec<Message>>);
@ -282,14 +283,28 @@ impl PositionManager {
// applying strategy to open position and saving into struct // applying strategy to open position and saving into struct
Some(position) => { Some(position) => {
let (position_after_strategy, strategy_events, strategy_messages) = let mut events = None;
self.strategy.on_new_tick(position, &self); let mut messages = None;
let (pos_on_tick, events_on_tick, messages_on_tick) = self
.strategy
.on_tick(position, self.current_tick(), &self.positions_history);
let (pos_post_tick, events_post_tick, messages_post_tick) = self
.strategy
.post_tick(pos_on_tick, self.current_tick(), &self.positions_history);
events.merge(events_on_tick);
events.merge(events_post_tick);
messages.merge(messages_on_tick);
messages.merge(messages_post_tick);
self.positions_history self.positions_history
.insert(self.current_tick(), position_after_strategy.clone()); .insert(self.current_tick(), pos_post_tick.clone());
self.active_position = Some(position_after_strategy); self.active_position = Some(pos_post_tick);
return Ok((strategy_events, strategy_messages)); return Ok((events, messages));
} }
} }
} }
@ -501,12 +516,12 @@ impl OrderManager {
for m in messages { for m in messages {
match m { match m {
Message::SubmitOrder { order } => { Message::SubmitOrder { order } => {
info!("Closing open order with a {} order", order.kind()); info!("Closing open order.");
if let Ok(_) = self.client.submit_order(&order).await {
info!("Cancelling open order #{}", active_order.id); info!("Cancelling open order #{}", active_order.id);
self.client.cancel_order(active_order).await?; self.client.cancel_order(active_order).await?;
}
info!("Submitting {}...", order.kind());
self.client.submit_order(&order).await?;
} }
_ => { _ => {
debug!("Received unsupported message from order strategy. Unimplemented.") debug!("Received unsupported message from order strategy. Unimplemented.")
@ -581,6 +596,9 @@ impl PairManager {
} }
pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> { pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> {
let mut events = None;
let mut messages = None;
let (price_results, pos_results, order_results) = tokio::join!( let (price_results, pos_results, order_results) = tokio::join!(
self.price_manager.update(tick), self.price_manager.update(tick),
self.position_manager.update(tick), self.position_manager.update(tick),
@ -591,9 +609,16 @@ impl PairManager {
let (opt_pos_events, opt_pos_messages) = pos_results?; let (opt_pos_events, opt_pos_messages) = pos_results?;
let (opt_order_events, opt_order_messages) = order_results?; let (opt_order_events, opt_order_messages) = order_results?;
// TODO: to move into Handler? events.merge(opt_price_events);
events.merge(opt_pos_events);
events.merge(opt_order_events);
if let Some(messages) = opt_pos_messages { messages.merge(opt_price_messages);
messages.merge(opt_pos_messages);
messages.merge(opt_order_messages);
// TODO: to move into Handler?
if let Some(messages) = messages {
for m in messages { for m in messages {
match m { match m {
Message::ClosePosition { position_id } => { Message::ClosePosition { position_id } => {

View File

@ -4,13 +4,13 @@ use std::ops::Neg;
use dyn_clone::DynClone; use dyn_clone::DynClone;
use log::{debug, info}; use log::{debug, info};
use tokio::sync::oneshot; use tokio::sync::{oneshot, RwLock};
use crate::events::{Event, EventKind, EventMetadata, Message}; use crate::events::{Event, EventKind, EventMetadata, Message};
use crate::managers::{OrderManager, PositionManager, TrackedPositionsMap}; use crate::managers::{OrderManager, PositionManager, TrackedPositionsMap};
use crate::models::{ use crate::models::{
ActiveOrder, OrderBook, OrderBookEntry, OrderForm, OrderKind, Position, PositionProfitState, ActiveOrder, OrderBook, OrderBookEntry, OrderForm, OrderKind, Position, PositionProfitState,
TradingPlatform, PositionState, TradingPlatform,
}; };
use crate::BoxError; use crate::BoxError;
@ -20,10 +20,17 @@ use crate::BoxError;
pub trait PositionStrategy: DynClone + Send + Sync { pub trait PositionStrategy: DynClone + Send + Sync {
fn name(&self) -> String; fn name(&self) -> String;
fn on_new_tick( fn on_tick(
&self, &mut self,
position: Position, position: Position,
manager: &PositionManager, current_tick: u64,
positions_history: &HashMap<u64, Position>,
) -> (Position, Option<Vec<Event>>, Option<Vec<Message>>);
fn post_tick(
&mut self,
position: Position,
current_tick: u64,
positions_history: &HashMap<u64, Position>,
) -> (Position, Option<Vec<Event>>, Option<Vec<Message>>); ) -> (Position, Option<Vec<Event>>, Option<Vec<Message>>);
} }
@ -65,10 +72,10 @@ pub struct TrailingStop {
} }
impl TrailingStop { impl TrailingStop {
const BREAK_EVEN_PERC: f64 = 0.2; const BREAK_EVEN_PERC: f64 = 0.01;
const MIN_PROFIT_PERC: f64 = TrailingStop::BREAK_EVEN_PERC + 0.3; const MIN_PROFIT_PERC: f64 = TrailingStop::BREAK_EVEN_PERC + 0.03;
const GOOD_PROFIT_PERC: f64 = TrailingStop::MIN_PROFIT_PERC * 2.5; const GOOD_PROFIT_PERC: f64 = TrailingStop::MIN_PROFIT_PERC * 2.5;
const MAX_LOSS_PERC: f64 = -0.01; const MAX_LOSS_PERC: f64 = -4.0;
const TAKER_FEE: f64 = 0.2; const TAKER_FEE: f64 = 0.2;
@ -78,8 +85,36 @@ impl TrailingStop {
} }
} }
fn net_pl_percentage(pl: f64, fee: f64) -> f64 { fn update_stop_percentage(&mut self, position: &Position) {
pl - fee if let Some(profit_state) = position.profit_state() {
let profit_state_delta = match profit_state {
PositionProfitState::MinimumProfit => Some(Self::MIN_PROFIT_PERC),
PositionProfitState::Profit => Some(Self::GOOD_PROFIT_PERC),
_ => None,
};
if let Some(profit_state_delta) = profit_state_delta {
let current_stop_percentage = position.pl_perc() - profit_state_delta;
match profit_state {
PositionProfitState::MinimumProfit | PositionProfitState::Profit => {
match self.stop_percentages.get(&position.id()) {
None => {
self.stop_percentages
.insert(position.id(), current_stop_percentage);
}
Some(existing_threshold) => {
if existing_threshold < &current_stop_percentage {
self.stop_percentages
.insert(position.id(), current_stop_percentage);
}
}
}
}
_ => {}
}
}
}
} }
} }
@ -88,14 +123,13 @@ impl PositionStrategy for TrailingStop {
"Trailing stop".into() "Trailing stop".into()
} }
fn on_new_tick( /// Sets the profit state of an open position
&self, fn on_tick(
&mut self,
position: Position, position: Position,
manager: &PositionManager, current_tick: u64,
positions_history: &HashMap<u64, Position>,
) -> (Position, Option<Vec<Event>>, Option<Vec<Message>>) { ) -> (Position, Option<Vec<Event>>, Option<Vec<Message>>) {
let mut messages = vec![];
let events = vec![];
let pl_perc = position.pl_perc(); let pl_perc = position.pl_perc();
let state = { let state = {
@ -110,35 +144,21 @@ impl PositionStrategy for TrailingStop {
} else if TrailingStop::MAX_LOSS_PERC < pl_perc && pl_perc < 0.0 { } else if TrailingStop::MAX_LOSS_PERC < pl_perc && pl_perc < 0.0 {
PositionProfitState::Loss PositionProfitState::Loss
} else { } else {
debug!("Inserting close position message...");
messages.push(Message::ClosePosition {
position_id: position.id(),
});
PositionProfitState::Critical PositionProfitState::Critical
} }
}; };
let opt_pre_pw = manager.position_previous_tick(position.id(), None); let opt_prev_position = positions_history.get(&(current_tick - 1));
let event_metadata = EventMetadata::new(Some(position.id()), None); let event_metadata = EventMetadata::new(Some(position.id()), None);
let new_position = position.clone().with_profit_state(Some(state)); let new_position = position.clone().with_profit_state(Some(state));
match opt_pre_pw { match opt_prev_position {
Some(prev) => { Some(prev) => {
if prev.profit_state() == Some(state) { if prev.profit_state() == Some(state) {
return ( return (new_position, None, None);
new_position,
(!events.is_empty()).then_some(events),
(!messages.is_empty()).then_some(messages),
);
} }
} }
None => { None => return (new_position, None, None),
return (
new_position,
(!events.is_empty()).then_some(events),
(!messages.is_empty()).then_some(messages),
)
}
}; };
let events = { let events = {
@ -147,31 +167,31 @@ impl PositionStrategy for TrailingStop {
if state == PositionProfitState::Profit { if state == PositionProfitState::Profit {
events.push(Event::new( events.push(Event::new(
EventKind::ReachedGoodProfit, EventKind::ReachedGoodProfit,
manager.current_tick(), current_tick,
Some(event_metadata), Some(event_metadata),
)); ));
} else if state == PositionProfitState::MinimumProfit { } else if state == PositionProfitState::MinimumProfit {
events.push(Event::new( events.push(Event::new(
EventKind::ReachedMinProfit, EventKind::ReachedMinProfit,
manager.current_tick(), current_tick,
Some(event_metadata), Some(event_metadata),
)); ));
} else if state == PositionProfitState::BreakEven { } else if state == PositionProfitState::BreakEven {
events.push(Event::new( events.push(Event::new(
EventKind::ReachedBreakEven, EventKind::ReachedBreakEven,
manager.current_tick(), current_tick,
Some(event_metadata), Some(event_metadata),
)); ));
} else if state == PositionProfitState::Loss { } else if state == PositionProfitState::Loss {
events.push(Event::new( events.push(Event::new(
EventKind::ReachedLoss, EventKind::ReachedLoss,
manager.current_tick(), current_tick,
Some(event_metadata), Some(event_metadata),
)); ));
} else { } else {
events.push(Event::new( events.push(Event::new(
EventKind::ReachedMaxLoss, EventKind::ReachedMaxLoss,
manager.current_tick(), current_tick,
Some(event_metadata), Some(event_metadata),
)); ));
} }
@ -179,11 +199,41 @@ impl PositionStrategy for TrailingStop {
events events
}; };
return ( return (new_position, Some(events), None);
new_position, }
(!events.is_empty()).then_some(events),
(!messages.is_empty()).then_some(messages), fn post_tick(
); &mut self,
position: Position,
_: u64,
_: &HashMap<u64, Position>,
) -> (Position, Option<Vec<Event>>, Option<Vec<Message>>) {
let close_message = Message::ClosePosition {
position_id: position.id(),
};
// if critical, early return with close position
if let Some(profit_state) = position.profit_state() {
match profit_state {
PositionProfitState::Critical => {
return (position, None, Some(vec![close_message]))
}
_ => {}
}
};
// let's check if we surpassed an existing stop percentage
if let Some(existing_stop_percentage) = self.stop_percentages.get(&position.id()) {
if existing_stop_percentage < &position.pl_perc() {
return (position, None, Some(vec![close_message]));
}
}
self.update_stop_percentage(&position);
println!("Stop percentages: {:?}", self.stop_percentages);
(position, None, None)
} }
} }