diff --git a/rustybot/src/events.rs b/rustybot/src/events.rs index 4e6868e..e2064bb 100644 --- a/rustybot/src/events.rs +++ b/rustybot/src/events.rs @@ -8,6 +8,7 @@ use crate::models::{Position, PositionProfitState}; #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum SignalKind { + Update(u64), ClosePosition(Position), OpenPosition, } diff --git a/rustybot/src/managers.rs b/rustybot/src/managers.rs index 8977d43..a89c14a 100644 --- a/rustybot/src/managers.rs +++ b/rustybot/src/managers.rs @@ -2,7 +2,9 @@ use std::collections::HashMap; use std::ops::Neg; use bitfinex::ticker::TradingPairTicker; -use log::error; +use log::{debug, error}; +use tokio::sync::mpsc::channel; +use tokio::sync::mpsc::{Receiver, Sender}; use crate::connectors::{Client, ExchangeKind}; use crate::currency::SymbolPair; @@ -10,28 +12,68 @@ use crate::events::{Dispatcher, Event, SignalKind}; use crate::models::{ExecutedOrder, OrderForm, OrderKind, Position, PriceTicker}; use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy}; use crate::BoxError; -use tokio::sync::mpsc::Receiver; +use futures_util::stream::FuturesUnordered; +use futures_util::StreamExt; pub struct EventManager { events: Vec, } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct PriceManager { + receiver: Receiver, pair: SymbolPair, prices: Vec, client: Client, } -impl PriceManager { +async fn run_price_manager(mut manager: PriceManager) { + while let Some(msg) = manager.receiver.recv().await { + manager.handle_message(msg).await.unwrap(); + } +} + +pub struct PriceManagerHandle { + sender: Sender, +} + +impl PriceManagerHandle { pub fn new(pair: SymbolPair, client: Client) -> Self { + let (sender, receiver) = channel(8); + + let price_manager = PriceManager::new(receiver, pair, client); + tokio::spawn(run_price_manager(price_manager)); + + Self { sender } + } + + pub async fn update(&mut self, tick: u64) { + self.sender.send(SignalKind::Update(tick)).await.unwrap(); + } +} + +impl PriceManager { + pub fn new(receiver: Receiver, pair: SymbolPair, client: Client) -> Self { PriceManager { + receiver, pair, prices: Vec::new(), client, } } + pub async fn handle_message(&mut self, message: SignalKind) -> Result<(), BoxError> { + match message { + SignalKind::Update(tick) => { + let a = self.update(tick).await?; + self.add_entry(a); + } + _ => {} + } + + Ok(()) + } + pub fn add_entry(&mut self, entry: PriceEntry) { self.prices.push(entry); } @@ -296,7 +338,7 @@ impl OrderManager { pub struct ExchangeManager { kind: ExchangeKind, - price_managers: Vec, + price_managers: Vec, order_managers: Vec, position_managers: Vec, dispatcher: Dispatcher, @@ -318,7 +360,7 @@ impl ExchangeManager { client.clone(), Box::new(FastOrderStrategy {}), )); - price_managers.push(PriceManager::new(p.clone(), client.clone())); + price_managers.push(PriceManagerHandle::new(p.clone(), client.clone())); } ExchangeManager { @@ -352,40 +394,25 @@ impl ExchangeManager { &mut self, tick: u64, ) -> Result>, BoxError> { - for mgr in &mut self.position_managers { - mgr.update(tick).await?; - } + let mut futures: FuturesUnordered<_> = self + .position_managers + .iter_mut() + .map(|x| x.update(tick)) + .collect(); + + while let Some(x) = futures.next().await {} Ok(None) } async fn update_price_managers(&mut self, tick: u64) -> Result>, BoxError> { - let futures: Vec<_> = self + let mut futures: FuturesUnordered<_> = self .price_managers - .clone() - .into_iter() - // the only reason you need the async block is that the future - // returned by x.update(tick) borrows from x - // so we create a future that first takes ownership of x, then uses it to call x.update - .map(|mut x| async move { x.update(tick).await }) - .map(tokio::spawn) + .iter_mut() + .map(|x| x.update(tick)) .collect(); - let mut price_entries = vec![]; - - for f in futures { - price_entries.push(f.await??); - } - - for manager in &mut self.price_managers { - let prices: Vec<_> = price_entries - .drain_filter(|x| x.pair() == manager.pair()) - .collect(); - - for p in prices { - manager.add_entry(p); - } - } + while let Some(x) = futures.next().await {} Ok(None) }