actor model stub + futures unordered

This commit is contained in:
Giulio De Pasquale 2021-01-16 21:38:00 +00:00
parent 268000b218
commit 03e9c94b3b
2 changed files with 60 additions and 32 deletions

View File

@ -8,6 +8,7 @@ use crate::models::{Position, PositionProfitState};
#[derive(Clone, Debug, Hash, PartialEq, Eq)] #[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum SignalKind { pub enum SignalKind {
Update(u64),
ClosePosition(Position), ClosePosition(Position),
OpenPosition, OpenPosition,
} }

View File

@ -2,7 +2,9 @@ use std::collections::HashMap;
use std::ops::Neg; use std::ops::Neg;
use bitfinex::ticker::TradingPairTicker; use bitfinex::ticker::TradingPairTicker;
use log::error; use log::{debug, error};
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::connectors::{Client, ExchangeKind}; use crate::connectors::{Client, ExchangeKind};
use crate::currency::SymbolPair; use crate::currency::SymbolPair;
@ -10,28 +12,68 @@ use crate::events::{Dispatcher, Event, SignalKind};
use crate::models::{ExecutedOrder, OrderForm, OrderKind, Position, PriceTicker}; use crate::models::{ExecutedOrder, OrderForm, OrderKind, Position, PriceTicker};
use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy}; use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy};
use crate::BoxError; use crate::BoxError;
use tokio::sync::mpsc::Receiver; use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
pub struct EventManager { pub struct EventManager {
events: Vec<Event>, events: Vec<Event>,
} }
#[derive(Clone, Debug)] #[derive(Debug)]
pub struct PriceManager { pub struct PriceManager {
receiver: Receiver<SignalKind>,
pair: SymbolPair, pair: SymbolPair,
prices: Vec<PriceEntry>, prices: Vec<PriceEntry>,
client: Client, client: Client,
} }
impl PriceManager { async fn run_price_manager(mut manager: PriceManager) {
while let Some(msg) = manager.receiver.recv().await {
manager.handle_message(msg).await.unwrap();
}
}
pub struct PriceManagerHandle {
sender: Sender<SignalKind>,
}
impl PriceManagerHandle {
pub fn new(pair: SymbolPair, client: Client) -> Self { pub fn new(pair: SymbolPair, client: Client) -> Self {
let (sender, receiver) = channel(8);
let price_manager = PriceManager::new(receiver, pair, client);
tokio::spawn(run_price_manager(price_manager));
Self { sender }
}
pub async fn update(&mut self, tick: u64) {
self.sender.send(SignalKind::Update(tick)).await.unwrap();
}
}
impl PriceManager {
pub fn new(receiver: Receiver<SignalKind>, pair: SymbolPair, client: Client) -> Self {
PriceManager { PriceManager {
receiver,
pair, pair,
prices: Vec::new(), prices: Vec::new(),
client, client,
} }
} }
pub async fn handle_message(&mut self, message: SignalKind) -> Result<(), BoxError> {
match message {
SignalKind::Update(tick) => {
let a = self.update(tick).await?;
self.add_entry(a);
}
_ => {}
}
Ok(())
}
pub fn add_entry(&mut self, entry: PriceEntry) { pub fn add_entry(&mut self, entry: PriceEntry) {
self.prices.push(entry); self.prices.push(entry);
} }
@ -296,7 +338,7 @@ impl OrderManager {
pub struct ExchangeManager { pub struct ExchangeManager {
kind: ExchangeKind, kind: ExchangeKind,
price_managers: Vec<PriceManager>, price_managers: Vec<PriceManagerHandle>,
order_managers: Vec<OrderManager>, order_managers: Vec<OrderManager>,
position_managers: Vec<PositionManager>, position_managers: Vec<PositionManager>,
dispatcher: Dispatcher, dispatcher: Dispatcher,
@ -318,7 +360,7 @@ impl ExchangeManager {
client.clone(), client.clone(),
Box::new(FastOrderStrategy {}), Box::new(FastOrderStrategy {}),
)); ));
price_managers.push(PriceManager::new(p.clone(), client.clone())); price_managers.push(PriceManagerHandle::new(p.clone(), client.clone()));
} }
ExchangeManager { ExchangeManager {
@ -352,40 +394,25 @@ impl ExchangeManager {
&mut self, &mut self,
tick: u64, tick: u64,
) -> Result<Option<Vec<Event>>, BoxError> { ) -> Result<Option<Vec<Event>>, BoxError> {
for mgr in &mut self.position_managers { let mut futures: FuturesUnordered<_> = self
mgr.update(tick).await?; .position_managers
} .iter_mut()
.map(|x| x.update(tick))
.collect();
while let Some(x) = futures.next().await {}
Ok(None) Ok(None)
} }
async fn update_price_managers(&mut self, tick: u64) -> Result<Option<Vec<Event>>, BoxError> { async fn update_price_managers(&mut self, tick: u64) -> Result<Option<Vec<Event>>, BoxError> {
let futures: Vec<_> = self let mut futures: FuturesUnordered<_> = self
.price_managers .price_managers
.clone() .iter_mut()
.into_iter() .map(|x| x.update(tick))
// the only reason you need the async block is that the future
// returned by x.update(tick) borrows from x
// so we create a future that first takes ownership of x, then uses it to call x.update
.map(|mut x| async move { x.update(tick).await })
.map(tokio::spawn)
.collect(); .collect();
let mut price_entries = vec![]; while let Some(x) = futures.next().await {}
for f in futures {
price_entries.push(f.await??);
}
for manager in &mut self.price_managers {
let prices: Vec<_> = price_entries
.drain_filter(|x| x.pair() == manager.pair())
.collect();
for p in prices {
manager.add_entry(p);
}
}
Ok(None) Ok(None)
} }