diff --git a/rustybot/src/bot.rs b/rustybot/src/bot.rs index 1cd951d..0a5fc46 100644 --- a/rustybot/src/bot.rs +++ b/rustybot/src/bot.rs @@ -1,4 +1,5 @@ use core::time::Duration; +use std::collections::HashMap; use log::{debug, error, info}; use tokio::time::delay_for; @@ -6,7 +7,7 @@ use tokio::time::delay_for; use crate::connectors::{Client, ExchangeKind}; use crate::currency::{Symbol, SymbolPair}; use crate::events::Event; -use crate::managers::{OrderManager, PositionManager, PriceManager}; +use crate::managers::{ExchangeManager, OrderManager, PositionManager, PriceManager}; use crate::strategy::PositionStrategy; use crate::ticker::Ticker; use crate::BoxError; @@ -15,9 +16,7 @@ pub struct BfxBot { ticker: Ticker, quote: Symbol, trading_symbols: Vec, - price_managers: Vec, - order_managers: Vec, - pos_managers: Vec, + exchange_managers: Vec, } impl BfxBot { @@ -27,48 +26,36 @@ impl BfxBot { quote: Symbol, tick_duration: Duration, ) -> Self { - let clients: Vec<_> = exchanges.iter().map(|x| Client::new(x)).collect(); let pairs: Vec<_> = trading_symbols .iter() .map(|x| SymbolPair::new(quote.clone(), x.clone())) .collect(); - let mut pos_managers = Vec::new(); - let mut order_managers = Vec::new(); - let mut price_managers = Vec::new(); - - for c in clients { - for p in &pairs { - pos_managers.push(PositionManager::new(p.clone(), c.clone())); - order_managers.push(OrderManager::new(p.clone(), c.clone())); - price_managers.push(PriceManager::new(p.clone(), c.clone())); - } - } + let exchange_managers = exchanges + .iter() + .map(|x| ExchangeManager::new(x, &pairs)) + .collect(); BfxBot { ticker: Ticker::new(tick_duration), - price_managers, quote, trading_symbols, - order_managers, - pos_managers, + exchange_managers, } } pub fn with_position_strategy(mut self, strategy: Box) -> Self { - self.pos_managers = self - .pos_managers + self.exchange_managers = self + .exchange_managers .into_iter() - .map(|x| x.with_strategy(dyn_clone::clone_box(&*strategy))) + .map(|x| x.with_position_strategy(dyn_clone::clone_box(&*strategy))) .collect(); self } pub async fn start_loop(&mut self) -> Result<(), BoxError> { - if let Err(e) = self.update_managers().await { - error!("Error while starting managers: {}", e); - } + self.update_exchanges().await?; loop { info!("Current tick: {}", self.ticker.current_tick()); @@ -79,65 +66,22 @@ impl BfxBot { } } + async fn update_exchanges(&mut self) -> Result<(), BoxError> { + for e in &mut self.exchange_managers { + if let Err(err) = e.update_managers(self.ticker.current_tick()).await { + error!("Error while updating managers: {}", err); + } + } + + Ok(()) + } + async fn update(&mut self) -> Result<(), BoxError> { delay_for(self.ticker.duration()).await; self.ticker.inc(); - if let Err(e) = self.update_managers().await { - error!("Error while updating managers: {}", e); - } + self.update_exchanges().await?; Ok(()) } - - async fn update_managers(&mut self) -> Result<(), BoxError> { - self.update_price_managers().await?; - self.update_position_managers().await?; - - Ok(()) - } - - async fn update_position_managers(&mut self) -> Result>, BoxError> { - for mgr in &mut self.pos_managers { - let tick = self.ticker.current_tick(); - - mgr.update(tick).await?; - } - - Ok(None) - } - - async fn update_price_managers(&mut self) -> Result>, BoxError> { - let futures: Vec<_> = self - .price_managers - .clone() - .into_iter() - // the only reason you need the async block is that the future - // returned by x.update(tick) borrows from x - // so we create a future that first takes ownership of x, then uses it to call x.update - .map(|mut x| { - let tick = self.ticker.current_tick(); - async move { x.update(tick).await } - }) - .map(tokio::spawn) - .collect(); - - let mut price_entries = vec![]; - - for f in futures { - price_entries.push(f.await??); - } - - for manager in &mut self.price_managers { - let prices: Vec<_> = price_entries - .drain_filter(|x| x.pair() == manager.pair()) - .collect(); - - for p in prices { - manager.add_entry(p); - } - } - - Ok(None) - } } diff --git a/rustybot/src/main.rs b/rustybot/src/main.rs index a280619..accaf6d 100644 --- a/rustybot/src/main.rs +++ b/rustybot/src/main.rs @@ -26,12 +26,10 @@ async fn main() -> Result<(), BoxError> { let test_api_key = "P1EVE68DJByDAkGQvpIkTwfrbYXd2Vo2ZaIhTYb9vx2"; let test_api_secret = "1nicg8z0zKVEt5Rb7ZDpIYjVYVTgvCaCPMZqB0niFli"; - let affiliate_code = "XPebOgHxA"; let bitfinex = ExchangeKind::Bitfinex { api_key: test_api_key.into(), api_secret: test_api_secret.into(), - affiliate_code: Some(affiliate_code.into()), }; let mut bot = BfxBot::new( @@ -65,7 +63,7 @@ fn setup_logger() -> Result<(), fern::InitError> { .level(Debug) .filter(|metadata| metadata.target().contains("rustybot")) .chain(std::io::stdout()) - .chain(fern::log_file("rustico.log")?) + // .chain(fern::log_file("rustico.log")?) .apply()?; Ok(()) diff --git a/rustybot/src/managers.rs b/rustybot/src/managers.rs index 4f28192..631137e 100644 --- a/rustybot/src/managers.rs +++ b/rustybot/src/managers.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use crate::connectors::Client; +use crate::connectors::{Client, ExchangeKind}; use crate::currency::SymbolPair; use crate::events::{Event, SignalKind}; use crate::models::{Order, Position, PriceTicker}; @@ -245,3 +245,95 @@ impl OrderManager { unimplemented!() } } + +pub struct ExchangeManager { + kind: ExchangeKind, + price_managers: Vec, + order_managers: Vec, + position_managers: Vec, + client: Client, +} + +impl ExchangeManager { + pub fn new(kind: &ExchangeKind, pairs: &Vec) -> Self { + let client = Client::new(kind); + + let mut position_managers = Vec::new(); + let mut order_managers = Vec::new(); + let mut price_managers = Vec::new(); + + for p in pairs { + position_managers.push(PositionManager::new(p.clone(), client.clone())); + order_managers.push(OrderManager::new(p.clone(), client.clone())); + price_managers.push(PriceManager::new(p.clone(), client.clone())); + } + + ExchangeManager { + kind: kind.clone(), + position_managers, + order_managers, + price_managers, + client, + } + } + + pub fn with_position_strategy(mut self, strategy: Box) -> Self { + self.position_managers = self + .position_managers + .into_iter() + .map(|x| x.with_strategy(dyn_clone::clone_box(&*strategy))) + .collect(); + + self + } + + pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> { + self.update_price_managers(tick).await?; + self.update_position_managers(tick).await?; + + Ok(()) + } + + async fn update_position_managers( + &mut self, + tick: u64, + ) -> Result>, BoxError> { + for mgr in &mut self.position_managers { + println!("Manager: {:?}", mgr); + mgr.update(tick).await?; + } + + Ok(None) + } + + async fn update_price_managers(&mut self, tick: u64) -> Result>, BoxError> { + let futures: Vec<_> = self + .price_managers + .clone() + .into_iter() + // the only reason you need the async block is that the future + // returned by x.update(tick) borrows from x + // so we create a future that first takes ownership of x, then uses it to call x.update + .map(|mut x| async move { x.update(tick).await }) + .map(tokio::spawn) + .collect(); + + let mut price_entries = vec![]; + + for f in futures { + price_entries.push(f.await??); + } + + for manager in &mut self.price_managers { + let prices: Vec<_> = price_entries + .drain_filter(|x| x.pair() == manager.pair()) + .collect(); + + for p in prices { + manager.add_entry(p); + } + } + + Ok(None) + } +}