From 12c9918d2c4c0cc2b61cb16829b76dd3dc9812c0 Mon Sep 17 00:00:00 2001 From: Giulio De Pasquale Date: Sun, 24 Jan 2021 19:36:25 +0000 Subject: [PATCH] broadcasting messages and events. trailing stop alpha version --- rustybot/Cargo.lock | 47 +++++++++++++ rustybot/Cargo.toml | 3 +- rustybot/src/managers.rs | 55 +++++++++++----- rustybot/src/strategy.rs | 138 ++++++++++++++++++++++++++------------- 4 files changed, 183 insertions(+), 60 deletions(-) diff --git a/rustybot/Cargo.lock b/rustybot/Cargo.lock index d1015ad..3c7e15c 100644 --- a/rustybot/Cargo.lock +++ b/rustybot/Cargo.lock @@ -680,6 +680,28 @@ version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" +[[package]] +name = "merge" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10bbef93abb1da61525bbc45eeaff6473a41907d19f8f9aa5168d214e10693e9" +dependencies = [ + "merge_derive", + "num-traits", +] + +[[package]] +name = "merge_derive" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "209d075476da2e63b4b29e72a2ef627b840589588e71400a25e3565c4f849d07" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "mime" version = "0.3.16" @@ -983,6 +1005,30 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.24" @@ -1144,6 +1190,7 @@ dependencies = [ "float-cmp", "futures-util", "log 0.4.11", + "merge", "regex", "tokio 0.2.24", "tokio-tungstenite", diff --git a/rustybot/Cargo.toml b/rustybot/Cargo.toml index fe7f9b2..b3bc398 100644 --- a/rustybot/Cargo.toml +++ b/rustybot/Cargo.toml @@ -18,4 +18,5 @@ log = "0.4" fern = {version = "0.6", features = ["colored"]} chrono = "0.4" byteorder = "1" -float-cmp = "0.8" \ No newline at end of file +float-cmp = "0.8" +merge = "0.1" \ No newline at end of file diff --git a/rustybot/src/managers.rs b/rustybot/src/managers.rs index c3998e7..79b6133 100644 --- a/rustybot/src/managers.rs +++ b/rustybot/src/managers.rs @@ -1,14 +1,13 @@ use std::collections::HashMap; -use std::ops::Neg; +use std::ops::{Deref, Neg}; use bitfinex::ticker::TradingPairTicker; use futures_util::stream::FuturesUnordered; -use futures_util::StreamExt; use log::{debug, error, info}; use tokio::signal::unix::Signal; use tokio::sync::mpsc::channel; use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, RwLock}; use crate::connectors::{Client, ExchangeDetails}; use crate::currency::SymbolPair; @@ -18,6 +17,8 @@ use crate::models::{ }; use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy, TrailingStop}; use crate::BoxError; +use merge::Merge; +use tokio::stream::StreamExt; pub type OptionUpdate = (Option>, Option>); @@ -282,14 +283,28 @@ impl PositionManager { // applying strategy to open position and saving into struct Some(position) => { - let (position_after_strategy, strategy_events, strategy_messages) = - self.strategy.on_new_tick(position, &self); + let mut events = None; + let mut messages = None; + + let (pos_on_tick, events_on_tick, messages_on_tick) = self + .strategy + .on_tick(position, self.current_tick(), &self.positions_history); + + let (pos_post_tick, events_post_tick, messages_post_tick) = self + .strategy + .post_tick(pos_on_tick, self.current_tick(), &self.positions_history); + + events.merge(events_on_tick); + events.merge(events_post_tick); + + messages.merge(messages_on_tick); + messages.merge(messages_post_tick); self.positions_history - .insert(self.current_tick(), position_after_strategy.clone()); - self.active_position = Some(position_after_strategy); + .insert(self.current_tick(), pos_post_tick.clone()); + self.active_position = Some(pos_post_tick); - return Ok((strategy_events, strategy_messages)); + return Ok((events, messages)); } } } @@ -501,12 +516,12 @@ impl OrderManager { for m in messages { match m { Message::SubmitOrder { order } => { - info!("Closing open order with a {} order", order.kind()); + info!("Closing open order."); + info!("Cancelling open order #{}", active_order.id); + self.client.cancel_order(active_order).await?; - if let Ok(_) = self.client.submit_order(&order).await { - info!("Cancelling open order #{}", active_order.id); - self.client.cancel_order(active_order).await?; - } + info!("Submitting {}...", order.kind()); + self.client.submit_order(&order).await?; } _ => { debug!("Received unsupported message from order strategy. Unimplemented.") @@ -581,6 +596,9 @@ impl PairManager { } pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> { + let mut events = None; + let mut messages = None; + let (price_results, pos_results, order_results) = tokio::join!( self.price_manager.update(tick), self.position_manager.update(tick), @@ -591,9 +609,16 @@ impl PairManager { let (opt_pos_events, opt_pos_messages) = pos_results?; let (opt_order_events, opt_order_messages) = order_results?; - // TODO: to move into Handler? + events.merge(opt_price_events); + events.merge(opt_pos_events); + events.merge(opt_order_events); - if let Some(messages) = opt_pos_messages { + messages.merge(opt_price_messages); + messages.merge(opt_pos_messages); + messages.merge(opt_order_messages); + + // TODO: to move into Handler? + if let Some(messages) = messages { for m in messages { match m { Message::ClosePosition { position_id } => { diff --git a/rustybot/src/strategy.rs b/rustybot/src/strategy.rs index e7c0890..14d79e3 100644 --- a/rustybot/src/strategy.rs +++ b/rustybot/src/strategy.rs @@ -4,13 +4,13 @@ use std::ops::Neg; use dyn_clone::DynClone; use log::{debug, info}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, RwLock}; use crate::events::{Event, EventKind, EventMetadata, Message}; use crate::managers::{OrderManager, PositionManager, TrackedPositionsMap}; use crate::models::{ ActiveOrder, OrderBook, OrderBookEntry, OrderForm, OrderKind, Position, PositionProfitState, - TradingPlatform, + PositionState, TradingPlatform, }; use crate::BoxError; @@ -20,10 +20,17 @@ use crate::BoxError; pub trait PositionStrategy: DynClone + Send + Sync { fn name(&self) -> String; - fn on_new_tick( - &self, + fn on_tick( + &mut self, position: Position, - manager: &PositionManager, + current_tick: u64, + positions_history: &HashMap, + ) -> (Position, Option>, Option>); + fn post_tick( + &mut self, + position: Position, + current_tick: u64, + positions_history: &HashMap, ) -> (Position, Option>, Option>); } @@ -65,10 +72,10 @@ pub struct TrailingStop { } impl TrailingStop { - const BREAK_EVEN_PERC: f64 = 0.2; - const MIN_PROFIT_PERC: f64 = TrailingStop::BREAK_EVEN_PERC + 0.3; + const BREAK_EVEN_PERC: f64 = 0.01; + const MIN_PROFIT_PERC: f64 = TrailingStop::BREAK_EVEN_PERC + 0.03; const GOOD_PROFIT_PERC: f64 = TrailingStop::MIN_PROFIT_PERC * 2.5; - const MAX_LOSS_PERC: f64 = -0.01; + const MAX_LOSS_PERC: f64 = -4.0; const TAKER_FEE: f64 = 0.2; @@ -78,8 +85,36 @@ impl TrailingStop { } } - fn net_pl_percentage(pl: f64, fee: f64) -> f64 { - pl - fee + fn update_stop_percentage(&mut self, position: &Position) { + if let Some(profit_state) = position.profit_state() { + let profit_state_delta = match profit_state { + PositionProfitState::MinimumProfit => Some(Self::MIN_PROFIT_PERC), + PositionProfitState::Profit => Some(Self::GOOD_PROFIT_PERC), + _ => None, + }; + + if let Some(profit_state_delta) = profit_state_delta { + let current_stop_percentage = position.pl_perc() - profit_state_delta; + + match profit_state { + PositionProfitState::MinimumProfit | PositionProfitState::Profit => { + match self.stop_percentages.get(&position.id()) { + None => { + self.stop_percentages + .insert(position.id(), current_stop_percentage); + } + Some(existing_threshold) => { + if existing_threshold < ¤t_stop_percentage { + self.stop_percentages + .insert(position.id(), current_stop_percentage); + } + } + } + } + _ => {} + } + } + } } } @@ -88,14 +123,13 @@ impl PositionStrategy for TrailingStop { "Trailing stop".into() } - fn on_new_tick( - &self, + /// Sets the profit state of an open position + fn on_tick( + &mut self, position: Position, - manager: &PositionManager, + current_tick: u64, + positions_history: &HashMap, ) -> (Position, Option>, Option>) { - let mut messages = vec![]; - let events = vec![]; - let pl_perc = position.pl_perc(); let state = { @@ -110,35 +144,21 @@ impl PositionStrategy for TrailingStop { } else if TrailingStop::MAX_LOSS_PERC < pl_perc && pl_perc < 0.0 { PositionProfitState::Loss } else { - debug!("Inserting close position message..."); - messages.push(Message::ClosePosition { - position_id: position.id(), - }); PositionProfitState::Critical } }; - let opt_pre_pw = manager.position_previous_tick(position.id(), None); + let opt_prev_position = positions_history.get(&(current_tick - 1)); let event_metadata = EventMetadata::new(Some(position.id()), None); let new_position = position.clone().with_profit_state(Some(state)); - match opt_pre_pw { + match opt_prev_position { Some(prev) => { if prev.profit_state() == Some(state) { - return ( - new_position, - (!events.is_empty()).then_some(events), - (!messages.is_empty()).then_some(messages), - ); + return (new_position, None, None); } } - None => { - return ( - new_position, - (!events.is_empty()).then_some(events), - (!messages.is_empty()).then_some(messages), - ) - } + None => return (new_position, None, None), }; let events = { @@ -147,31 +167,31 @@ impl PositionStrategy for TrailingStop { if state == PositionProfitState::Profit { events.push(Event::new( EventKind::ReachedGoodProfit, - manager.current_tick(), + current_tick, Some(event_metadata), )); } else if state == PositionProfitState::MinimumProfit { events.push(Event::new( EventKind::ReachedMinProfit, - manager.current_tick(), + current_tick, Some(event_metadata), )); } else if state == PositionProfitState::BreakEven { events.push(Event::new( EventKind::ReachedBreakEven, - manager.current_tick(), + current_tick, Some(event_metadata), )); } else if state == PositionProfitState::Loss { events.push(Event::new( EventKind::ReachedLoss, - manager.current_tick(), + current_tick, Some(event_metadata), )); } else { events.push(Event::new( EventKind::ReachedMaxLoss, - manager.current_tick(), + current_tick, Some(event_metadata), )); } @@ -179,11 +199,41 @@ impl PositionStrategy for TrailingStop { events }; - return ( - new_position, - (!events.is_empty()).then_some(events), - (!messages.is_empty()).then_some(messages), - ); + return (new_position, Some(events), None); + } + + fn post_tick( + &mut self, + position: Position, + _: u64, + _: &HashMap, + ) -> (Position, Option>, Option>) { + let close_message = Message::ClosePosition { + position_id: position.id(), + }; + + // if critical, early return with close position + if let Some(profit_state) = position.profit_state() { + match profit_state { + PositionProfitState::Critical => { + return (position, None, Some(vec![close_message])) + } + _ => {} + } + }; + + // let's check if we surpassed an existing stop percentage + if let Some(existing_stop_percentage) = self.stop_percentages.get(&position.id()) { + if existing_stop_percentage < &position.pl_perc() { + return (position, None, Some(vec![close_message])); + } + } + + self.update_stop_percentage(&position); + + println!("Stop percentages: {:?}", self.stop_percentages); + + (position, None, None) } }