From 8283ecde6099c28f7af14377748d9905dd4c0ead Mon Sep 17 00:00:00 2001 From: Giulio De Pasquale Date: Mon, 18 Jan 2021 11:54:40 +0000 Subject: [PATCH] refactored SignalKind into Message and ActorMessage --- rustybot/src/events.rs | 241 ++++++++++++++++++++------------------- rustybot/src/managers.rs | 118 ++++++++++++------- rustybot/src/strategy.rs | 11 +- 3 files changed, 208 insertions(+), 162 deletions(-) diff --git a/rustybot/src/events.rs b/rustybot/src/events.rs index e2064bb..5ef4766 100644 --- a/rustybot/src/events.rs +++ b/rustybot/src/events.rs @@ -3,17 +3,24 @@ use std::future::Future; use tokio::task::JoinHandle; -use crate::managers::{OrderManager, PositionManager, PriceManager}; +use crate::managers::{OptionUpdate, OrderManager, PositionManager, PriceManager}; use crate::models::{Position, PositionProfitState}; +use tokio::sync::oneshot; -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub enum SignalKind { - Update(u64), - ClosePosition(Position), +#[derive(Debug)] +pub struct ActorMessage { + pub(crate) message: Message, + pub(crate) respond_to: oneshot::Sender, +} + +#[derive(Debug)] +pub enum Message { + Update { tick: u64 }, + ClosePosition { position: Position }, OpenPosition, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub struct EventMetadata { position_id: Option, order_id: Option, @@ -44,7 +51,7 @@ pub enum EventKind { Any, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)] pub struct Event { kind: EventKind, tick: u64, @@ -74,113 +81,113 @@ impl Event { self.metadata } } - -pub struct Dispatcher { - event_handlers: HashMap JoinHandle<()>>>>, - profit_state_handlers: - HashMap JoinHandle<()>>>>, - signal_handlers: HashMap JoinHandle<()>>>>, - - on_any_event_handlers: Vec JoinHandle<()>>>, - on_any_profit_state_handlers: Vec JoinHandle<()>>>, -} - -impl Dispatcher { - pub fn new() -> Self { - Dispatcher { - event_handlers: HashMap::new(), - profit_state_handlers: HashMap::new(), - signal_handlers: HashMap::new(), - on_any_event_handlers: Vec::new(), - on_any_profit_state_handlers: Vec::new(), - } - } - - pub fn call_signal_handlers(&self, signal: &SignalKind) { - if let Some(functions) = self.signal_handlers.get(&signal) { - for f in functions { - f(signal); - } - } - } - - pub fn call_event_handlers(&self, event: &Event, status: &PriceManager) { - if let Some(functions) = self.event_handlers.get(&event.kind()) { - for f in functions { - f(event, status); - } - } - - for f in &self.on_any_event_handlers { - f(event, status); - } - } - - pub fn call_position_state_handlers(&self, position: &Position, status: &PriceManager) { - if let Some(profit_state) = &position.profit_state() { - if let Some(functions) = self.profit_state_handlers.get(profit_state) { - for f in functions { - f(position, status); - } - } - } - - for f in &self.on_any_profit_state_handlers { - f(position, status); - } - } - - pub fn register_event_handler(&mut self, event: EventKind, f: F) - where - F: Fn(&Event, &PriceManager) -> Fut, - Fut: Future + Send, - { - match event { - EventKind::Any => self - .on_any_event_handlers - .push(Box::new(move |e, s| tokio::spawn(f(&e, s)))), - _ => self - .event_handlers - .entry(event) - .or_default() - .push(Box::new(move |e, s| tokio::spawn(f(&e, s)))), - } - } - - pub fn register_positionstate_handler( - &mut self, - state: PositionProfitState, - f: F, - ) where - F: Fn(&Position, &PriceManager) -> Fut, - Fut: Future + Send, - { - match state { - // PositionProfitState::Any => self - // .on_any_position_state_handlers - // .push(Box::new(move |p, s| tokio::spawn(f(&p, s)))), - _ => self - .profit_state_handlers - .entry(state) - .or_default() - .push(Box::new(move |p, s| tokio::spawn(f(&p, s)))), - } - } - - pub fn register_signal_handler(&mut self, signal: SignalKind, f: F) - where - F: Fn(&SignalKind) -> Fut, - Fut: Future + Send, - { - match signal { - // PositionProfitState::Any => self - // .on_any_position_state_handlers - // .push(Box::new(move |p, s| tokio::spawn(f(&p, s)))), - _ => self - .signal_handlers - .entry(signal) - .or_default() - .push(Box::new(move |s| tokio::spawn(f(s)))), - } - } -} +// +// pub struct Dispatcher { +// event_handlers: HashMap JoinHandle<()>>>>, +// profit_state_handlers: +// HashMap JoinHandle<()>>>>, +// signal_handlers: HashMap JoinHandle<()>>>>, +// +// on_any_event_handlers: Vec JoinHandle<()>>>, +// on_any_profit_state_handlers: Vec JoinHandle<()>>>, +// } +// +// impl Dispatcher { +// pub fn new() -> Self { +// Dispatcher { +// event_handlers: HashMap::new(), +// profit_state_handlers: HashMap::new(), +// signal_handlers: HashMap::new(), +// on_any_event_handlers: Vec::new(), +// on_any_profit_state_handlers: Vec::new(), +// } +// } +// +// pub fn call_signal_handlers(&self, signal: &SignalKind) { +// if let Some(functions) = self.signal_handlers.get(&signal) { +// for f in functions { +// f(signal); +// } +// } +// } +// +// pub fn call_event_handlers(&self, event: &Event, status: &PriceManager) { +// if let Some(functions) = self.event_handlers.get(&event.kind()) { +// for f in functions { +// f(event, status); +// } +// } +// +// for f in &self.on_any_event_handlers { +// f(event, status); +// } +// } +// +// pub fn call_position_state_handlers(&self, position: &Position, status: &PriceManager) { +// if let Some(profit_state) = &position.profit_state() { +// if let Some(functions) = self.profit_state_handlers.get(profit_state) { +// for f in functions { +// f(position, status); +// } +// } +// } +// +// for f in &self.on_any_profit_state_handlers { +// f(position, status); +// } +// } +// +// pub fn register_event_handler(&mut self, event: EventKind, f: F) +// where +// F: Fn(&Event, &PriceManager) -> Fut, +// Fut: Future + Send, +// { +// match event { +// EventKind::Any => self +// .on_any_event_handlers +// .push(Box::new(move |e, s| tokio::spawn(f(&e, s)))), +// _ => self +// .event_handlers +// .entry(event) +// .or_default() +// .push(Box::new(move |e, s| tokio::spawn(f(&e, s)))), +// } +// } +// +// pub fn register_positionstate_handler( +// &mut self, +// state: PositionProfitState, +// f: F, +// ) where +// F: Fn(&Position, &PriceManager) -> Fut, +// Fut: Future + Send, +// { +// match state { +// // PositionProfitState::Any => self +// // .on_any_position_state_handlers +// // .push(Box::new(move |p, s| tokio::spawn(f(&p, s)))), +// _ => self +// .profit_state_handlers +// .entry(state) +// .or_default() +// .push(Box::new(move |p, s| tokio::spawn(f(&p, s)))), +// } +// } +// +// pub fn register_signal_handler(&mut self, signal: SignalKind, f: F) +// where +// F: Fn(&SignalKind) -> Fut, +// Fut: Future + Send, +// { +// match signal { +// // PositionProfitState::Any => self +// // .on_any_position_state_handlers +// // .push(Box::new(move |p, s| tokio::spawn(f(&p, s)))), +// _ => self +// .signal_handlers +// .entry(signal) +// .or_default() +// .push(Box::new(move |s| tokio::spawn(f(s)))), +// } +// } +// } diff --git a/rustybot/src/managers.rs b/rustybot/src/managers.rs index ca066af..baa3de3 100644 --- a/rustybot/src/managers.rs +++ b/rustybot/src/managers.rs @@ -8,15 +8,16 @@ use log::{debug, error, info}; use tokio::signal::unix::Signal; use tokio::sync::mpsc::channel; use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::oneshot; use crate::connectors::{Client, ExchangeKind}; use crate::currency::SymbolPair; -use crate::events::{Dispatcher, Event, SignalKind}; +use crate::events::{ActorMessage, Event, Message}; use crate::models::{ExecutedOrder, OrderForm, OrderKind, Position, PriceTicker}; use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy, TrailingStop}; use crate::BoxError; -type OptionUpdate = (Option>, Option>); +pub type OptionUpdate = (Option>, Option>); pub struct EventManager { events: Vec, @@ -28,14 +29,14 @@ pub struct EventManager { #[derive(Debug)] pub struct PriceManager { - receiver: Receiver, + receiver: Receiver, pair: SymbolPair, prices: Vec, client: Client, } pub struct PriceManagerHandle { - sender: Sender, + sender: Sender, } impl PriceManagerHandle { @@ -54,13 +55,22 @@ impl PriceManagerHandle { Self { sender } } - pub async fn update(&mut self, tick: u64) { - self.sender.send(SignalKind::Update(tick)).await.unwrap(); + pub async fn update(&mut self, tick: u64) -> Result { + let (send, recv) = oneshot::channel(); + + self.sender + .send(ActorMessage { + message: Message::Update { tick }, + respond_to: send, + }) + .await?; + + Ok(recv.await?) } } impl PriceManager { - pub fn new(receiver: Receiver, pair: SymbolPair, client: Client) -> Self { + pub fn new(receiver: Receiver, pair: SymbolPair, client: Client) -> Self { PriceManager { receiver, pair, @@ -69,9 +79,9 @@ impl PriceManager { } } - pub async fn handle_message(&mut self, message: SignalKind) -> Result<(), BoxError> { - match message { - SignalKind::Update(tick) => { + pub async fn handle_message(&mut self, message: ActorMessage) -> Result<(), BoxError> { + match message.message { + Message::Update { tick } => { let a = self.update(tick).await?; self.add_entry(a); } @@ -93,7 +103,7 @@ impl PriceManager { current_prices, self.pair.clone(), None, - None, + // None, )) } @@ -118,7 +128,7 @@ pub struct PriceEntry { pair: SymbolPair, price: PriceTicker, events: Option>, - signals: Option>, + // signals: Option>, } impl PriceEntry { @@ -127,14 +137,14 @@ impl PriceEntry { price: PriceTicker, pair: SymbolPair, events: Option>, - signals: Option>, + // signals: Option>, ) -> Self { PriceEntry { tick, pair, price, events, - signals, + // signals, } } @@ -150,9 +160,9 @@ impl PriceEntry { pub fn events(&self) -> &Option> { &self.events } - pub fn signals(&self) -> &Option> { - &self.signals - } + // pub fn signals(&self) -> &Option> { + // &self.signals + // } } /****************** @@ -160,7 +170,7 @@ impl PriceEntry { ******************/ pub struct PositionManagerHandle { - sender: Sender, + sender: Sender, } impl PositionManagerHandle { @@ -180,14 +190,23 @@ impl PositionManagerHandle { Self { sender } } - pub async fn update(&mut self, tick: u64) { - self.sender.send(SignalKind::Update(tick)).await.unwrap(); + pub async fn update(&mut self, tick: u64) -> Result { + let (send, recv) = oneshot::channel(); + + self.sender + .send(ActorMessage { + message: Message::Update { tick }, + respond_to: send, + }) + .await?; + + Ok(recv.await?) } } #[derive(Debug)] pub struct PositionManager { - receiver: Receiver, + receiver: Receiver, current_tick: u64, pair: SymbolPair, positions_history: HashMap, @@ -198,7 +217,7 @@ pub struct PositionManager { impl PositionManager { pub fn new( - receiver: Receiver, + receiver: Receiver, pair: SymbolPair, client: Client, strategy: Box, @@ -218,10 +237,12 @@ impl PositionManager { self.current_tick } - pub async fn handle_message(&mut self, msg: SignalKind) -> Result<(), BoxError> { - match msg { - SignalKind::Update(tick) => { - self.update(tick).await?; + pub async fn handle_message(&mut self, msg: ActorMessage) -> Result<(), BoxError> { + match msg.message { + Message::Update { tick } => { + let result = self.update(tick).await?; + + msg.respond_to.send(result); } _ => {} }; @@ -296,7 +317,7 @@ impl PositionManager { pub type TrackedPositionsMap = HashMap; pub struct OrderManagerHandle { - sender: Sender, + sender: Sender, } impl OrderManagerHandle { @@ -316,20 +337,35 @@ impl OrderManagerHandle { Self { sender } } - pub async fn update(&mut self, tick: u64) { - self.sender.send(SignalKind::Update(tick)).await.unwrap(); + pub async fn update(&mut self, tick: u64) -> Result { + let (send, recv) = oneshot::channel(); + + self.sender + .send(ActorMessage { + message: Message::Update { tick }, + respond_to: send, + }) + .await?; + + Ok(recv.await?) } - pub async fn close_position(&mut self, position: Position) { + pub async fn close_position(&mut self, position: Position) -> Result { + let (send, recv) = oneshot::channel(); + self.sender - .send(SignalKind::ClosePosition(position)) - .await - .unwrap(); + .send(ActorMessage { + message: Message::ClosePosition { position }, + respond_to: send, + }) + .await?; + + Ok(recv.await?) } } pub struct OrderManager { - receiver: Receiver, + receiver: Receiver, tracked_positions: TrackedPositionsMap, pair: SymbolPair, open_orders: Vec, @@ -341,7 +377,7 @@ impl OrderManager { const UNDERCUT_PERC: f64 = 0.005; pub fn new( - receiver: Receiver, + receiver: Receiver, pair: SymbolPair, client: Client, strategy: Box, @@ -356,12 +392,12 @@ impl OrderManager { } } - pub async fn handle_message(&mut self, msg: SignalKind) -> Result<(), BoxError> { - match msg { - SignalKind::Update(_) => { + pub async fn handle_message(&mut self, msg: ActorMessage) -> Result<(), BoxError> { + match msg.message { + Message::Update { .. } => { self.update(); } - SignalKind::ClosePosition(position) => self.close_position(&position).await?, + Message::ClosePosition { position, .. } => self.close_position(&position).await?, _ => {} }; @@ -438,7 +474,7 @@ pub struct PairManager { price_manager: PriceManagerHandle, order_manager: OrderManagerHandle, position_manager: PositionManagerHandle, - dispatcher: Dispatcher, + // dispatcher: Dispatcher, } impl PairManager { @@ -456,7 +492,7 @@ impl PairManager { client.clone(), Box::new(TrailingStop::new()), ), - dispatcher: Dispatcher::new(), + // dispatcher: Dispatcher::new(), } } } diff --git a/rustybot/src/strategy.rs b/rustybot/src/strategy.rs index c1b27ff..14b9d38 100644 --- a/rustybot/src/strategy.rs +++ b/rustybot/src/strategy.rs @@ -3,9 +3,10 @@ use std::fmt::{Debug, Formatter}; use dyn_clone::DynClone; -use crate::events::{Event, EventKind, EventMetadata, SignalKind}; +use crate::events::{Event, EventKind, EventMetadata, Message}; use crate::managers::{OrderManager, PositionManager, TrackedPositionsMap}; use crate::models::{ExecutedOrder, OrderForm, Position, PositionProfitState}; +use tokio::sync::oneshot; /*************** * DEFINITIONS @@ -17,7 +18,7 @@ pub trait PositionStrategy: DynClone + Send { &self, position: Position, manager: &PositionManager, - ) -> (Position, Option>, Option>); + ) -> (Position, Option>, Option>); } impl Debug for dyn PositionStrategy { @@ -84,7 +85,7 @@ impl PositionStrategy for TrailingStop { &self, position: Position, manager: &PositionManager, - ) -> (Position, Option>, Option>) { + ) -> (Position, Option>, Option>) { let mut signals = vec![]; let events = vec![]; @@ -102,7 +103,9 @@ impl PositionStrategy for TrailingStop { } else if TrailingStop::MAX_LOSS_PERC < pl_perc && pl_perc < 0.0 { PositionProfitState::Loss } else { - signals.push(SignalKind::ClosePosition(position.clone())); + signals.push(Message::ClosePosition { + position: position.clone(), + }); PositionProfitState::Critical } };