From 503c542a5f520afd51e3b31b2a79533e04d011fd Mon Sep 17 00:00:00 2001 From: Giulio De Pasquale Date: Sun, 17 Jan 2021 18:18:16 +0000 Subject: [PATCH] positionmanager is now an actor as well --- rustybot/src/bot.rs | 10 ---- rustybot/src/main.rs | 3 +- rustybot/src/managers.rs | 120 +++++++++++++++++++++++++-------------- rustybot/src/strategy.rs | 4 +- 4 files changed, 81 insertions(+), 56 deletions(-) diff --git a/rustybot/src/bot.rs b/rustybot/src/bot.rs index 0a5fc46..c8f84dd 100644 --- a/rustybot/src/bot.rs +++ b/rustybot/src/bot.rs @@ -44,16 +44,6 @@ impl BfxBot { } } - pub fn with_position_strategy(mut self, strategy: Box) -> Self { - self.exchange_managers = self - .exchange_managers - .into_iter() - .map(|x| x.with_position_strategy(dyn_clone::clone_box(&*strategy))) - .collect(); - - self - } - pub async fn start_loop(&mut self) -> Result<(), BoxError> { self.update_exchanges().await?; diff --git a/rustybot/src/main.rs b/rustybot/src/main.rs index 0d42b51..1dde9fc 100644 --- a/rustybot/src/main.rs +++ b/rustybot/src/main.rs @@ -38,8 +38,7 @@ async fn main() -> Result<(), BoxError> { vec![Symbol::TESTBTC], Symbol::TESTUSD, Duration::new(1, 0), - ) - .with_position_strategy(Box::new(TrailingStop::new())); + ); Ok(bot.start_loop().await?) } diff --git a/rustybot/src/managers.rs b/rustybot/src/managers.rs index a89c14a..e8d01a2 100644 --- a/rustybot/src/managers.rs +++ b/rustybot/src/managers.rs @@ -2,7 +2,10 @@ use std::collections::HashMap; use std::ops::Neg; use bitfinex::ticker::TradingPairTicker; +use futures_util::stream::FuturesUnordered; +use futures_util::StreamExt; use log::{debug, error}; +use tokio::signal::unix::Signal; use tokio::sync::mpsc::channel; use tokio::sync::mpsc::{Receiver, Sender}; @@ -10,15 +13,17 @@ use crate::connectors::{Client, ExchangeKind}; use crate::currency::SymbolPair; use crate::events::{Dispatcher, Event, SignalKind}; use crate::models::{ExecutedOrder, OrderForm, OrderKind, Position, PriceTicker}; -use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy}; +use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy, TrailingStop}; use crate::BoxError; -use futures_util::stream::FuturesUnordered; -use futures_util::StreamExt; pub struct EventManager { events: Vec, } +/****************** +* PRICES +******************/ + #[derive(Debug)] pub struct PriceManager { receiver: Receiver, @@ -27,22 +32,22 @@ pub struct PriceManager { client: Client, } -async fn run_price_manager(mut manager: PriceManager) { - while let Some(msg) = manager.receiver.recv().await { - manager.handle_message(msg).await.unwrap(); - } -} - pub struct PriceManagerHandle { sender: Sender, } impl PriceManagerHandle { + async fn run_price_manager(mut manager: PriceManager) { + while let Some(msg) = manager.receiver.recv().await { + manager.handle_message(msg).await.unwrap(); + } + } + pub fn new(pair: SymbolPair, client: Client) -> Self { let (sender, receiver) = channel(8); let price_manager = PriceManager::new(receiver, pair, client); - tokio::spawn(run_price_manager(price_manager)); + tokio::spawn(PriceManagerHandle::run_price_manager(price_manager)); Self { sender } } @@ -148,37 +153,79 @@ impl PriceEntry { } } +/****************** +* POSITIONS +******************/ + +pub struct PositionManagerHandle { + sender: Sender, +} + +impl PositionManagerHandle { + async fn run_position_manager(mut manager: PositionManager) { + while let Some(msg) = manager.receiver.recv().await { + manager.handle_message(msg).await.unwrap(); + } + } + + pub fn new(pair: SymbolPair, client: Client, strategy: Box) -> Self { + let (sender, receiver) = channel(8); + + let manager = PositionManager::new(receiver, pair, client, strategy); + + tokio::spawn(PositionManagerHandle::run_position_manager(manager)); + + Self { sender } + } + + pub async fn update(&mut self, tick: u64) { + self.sender.send(SignalKind::Update(tick)).await.unwrap(); + } +} + #[derive(Debug)] pub struct PositionManager { + receiver: Receiver, current_tick: u64, pair: SymbolPair, positions_history: HashMap, active_position: Option, client: Client, - strategy: Option>, + strategy: Box, } impl PositionManager { - pub fn new(pair: SymbolPair, client: Client) -> Self { + pub fn new( + receiver: Receiver, + pair: SymbolPair, + client: Client, + strategy: Box, + ) -> Self { PositionManager { + receiver, current_tick: 0, pair, positions_history: HashMap::new(), active_position: None, client, - strategy: None, + strategy, } } - pub fn with_strategy(mut self, strategy: Box) -> Self { - self.strategy = Some(strategy); - self - } - pub fn current_tick(&self) -> u64 { self.current_tick } + pub async fn handle_message(&mut self, msg: SignalKind) -> Result<(), BoxError> { + match msg { + SignalKind::Update(tick) => { + self.update(tick).await?; + } + _ => {} + }; + + Ok(()) + } pub async fn update( &mut self, tick: u64, @@ -205,19 +252,10 @@ impl PositionManager { // applying strategy to open position and saving into struct Some(position) => { - let position_after_strategy = { - match &self.strategy { - Some(strategy) => { - let (pos, strategy_events, _) = - strategy.on_new_tick(position, &self); + let (position_after_strategy, strategy_events, _) = + self.strategy.on_new_tick(position, &self); - events.extend(strategy_events); - - pos - } - None => position, - } - }; + events.extend(strategy_events); self.positions_history .insert(self.current_tick(), position_after_strategy.clone()); @@ -249,6 +287,10 @@ impl PositionManager { } } +/****************** +* ORDERS +******************/ + pub type TrackedPositionsMap = HashMap; pub struct OrderManager { @@ -340,7 +382,7 @@ pub struct ExchangeManager { kind: ExchangeKind, price_managers: Vec, order_managers: Vec, - position_managers: Vec, + position_managers: Vec, dispatcher: Dispatcher, client: Client, } @@ -354,7 +396,11 @@ impl ExchangeManager { let mut price_managers = Vec::new(); for p in pairs { - position_managers.push(PositionManager::new(p.clone(), client.clone())); + position_managers.push(PositionManagerHandle::new( + p.clone(), + client.clone(), + Box::new(TrailingStop::new()), + )); order_managers.push(OrderManager::new( p.clone(), client.clone(), @@ -373,16 +419,6 @@ impl ExchangeManager { } } - pub fn with_position_strategy(mut self, strategy: Box) -> Self { - self.position_managers = self - .position_managers - .into_iter() - .map(|x| x.with_strategy(dyn_clone::clone_box(&*strategy))) - .collect(); - - self - } - pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> { self.update_price_managers(tick).await?; self.update_position_managers(tick).await?; diff --git a/rustybot/src/strategy.rs b/rustybot/src/strategy.rs index fb1dce1..e238a8a 100644 --- a/rustybot/src/strategy.rs +++ b/rustybot/src/strategy.rs @@ -11,7 +11,7 @@ use crate::models::{ExecutedOrder, OrderForm, Position, PositionProfitState}; * DEFINITIONS ***************/ -pub trait PositionStrategy: DynClone { +pub trait PositionStrategy: DynClone + Send { fn name(&self) -> String; fn on_new_tick( &self, @@ -26,7 +26,7 @@ impl Debug for dyn PositionStrategy { } } -pub trait OrderStrategy: DynClone { +pub trait OrderStrategy: DynClone + Send { /// The name of the strategy, used for debugging purposes fn name(&self) -> String; /// This method is called when the OrderManager checks the open orders on a new tick.