use std::collections::HashMap; use std::ops::Neg; use bitfinex::ticker::TradingPairTicker; use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; use log::{debug, error}; use tokio::signal::unix::Signal; use tokio::sync::mpsc::channel; use tokio::sync::mpsc::{Receiver, Sender}; use crate::connectors::{Client, ExchangeKind}; use crate::currency::SymbolPair; use crate::events::{Dispatcher, Event, SignalKind}; use crate::models::{ExecutedOrder, OrderForm, OrderKind, Position, PriceTicker}; use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy, TrailingStop}; use crate::BoxError; pub struct EventManager { events: Vec, } /****************** * PRICES ******************/ #[derive(Debug)] pub struct PriceManager { receiver: Receiver, pair: SymbolPair, prices: Vec, client: Client, } pub struct PriceManagerHandle { sender: Sender, } impl PriceManagerHandle { async fn run_price_manager(mut manager: PriceManager) { while let Some(msg) = manager.receiver.recv().await { manager.handle_message(msg).await.unwrap(); } } pub fn new(pair: SymbolPair, client: Client) -> Self { let (sender, receiver) = channel(8); let price_manager = PriceManager::new(receiver, pair, client); tokio::spawn(PriceManagerHandle::run_price_manager(price_manager)); Self { sender } } pub async fn update(&mut self, tick: u64) { self.sender.send(SignalKind::Update(tick)).await.unwrap(); } } impl PriceManager { pub fn new(receiver: Receiver, pair: SymbolPair, client: Client) -> Self { PriceManager { receiver, pair, prices: Vec::new(), client, } } pub async fn handle_message(&mut self, message: SignalKind) -> Result<(), BoxError> { match message { SignalKind::Update(tick) => { let a = self.update(tick).await?; self.add_entry(a); } _ => {} } Ok(()) } pub fn add_entry(&mut self, entry: PriceEntry) { self.prices.push(entry); } pub async fn update(&mut self, tick: u64) -> Result { let current_prices = self.client.current_prices(&self.pair).await?.into(); Ok(PriceEntry::new( tick, current_prices, self.pair.clone(), None, None, )) } // fn add_event(&mut self, event: Event) { // self.events.push(event); // // self.dispatcher.call_event_handlers(&event, &self); // } // // fn add_signal(&mut self, signal: SignalKind) { // self.signals.insert(self.current_tick(), signal); // } pub fn pair(&self) -> &SymbolPair { &self.pair } } #[derive(Clone, Debug)] pub struct PriceEntry { tick: u64, pair: SymbolPair, price: PriceTicker, events: Option>, signals: Option>, } impl PriceEntry { pub fn new( tick: u64, price: PriceTicker, pair: SymbolPair, events: Option>, signals: Option>, ) -> Self { PriceEntry { tick, pair, price, events, signals, } } pub fn tick(&self) -> u64 { self.tick } pub fn pair(&self) -> &SymbolPair { &self.pair } pub fn price(&self) -> PriceTicker { self.price } pub fn events(&self) -> &Option> { &self.events } pub fn signals(&self) -> &Option> { &self.signals } } /****************** * POSITIONS ******************/ pub struct PositionManagerHandle { sender: Sender, } impl PositionManagerHandle { async fn run_position_manager(mut manager: PositionManager) { while let Some(msg) = manager.receiver.recv().await { manager.handle_message(msg).await.unwrap(); } } pub fn new(pair: SymbolPair, client: Client, strategy: Box) -> Self { let (sender, receiver) = channel(8); let manager = PositionManager::new(receiver, pair, client, strategy); tokio::spawn(PositionManagerHandle::run_position_manager(manager)); Self { sender } } pub async fn update(&mut self, tick: u64) { self.sender.send(SignalKind::Update(tick)).await.unwrap(); } } #[derive(Debug)] pub struct PositionManager { receiver: Receiver, current_tick: u64, pair: SymbolPair, positions_history: HashMap, active_position: Option, client: Client, strategy: Box, } impl PositionManager { pub fn new( receiver: Receiver, pair: SymbolPair, client: Client, strategy: Box, ) -> Self { PositionManager { receiver, current_tick: 0, pair, positions_history: HashMap::new(), active_position: None, client, strategy, } } pub fn current_tick(&self) -> u64 { self.current_tick } pub async fn handle_message(&mut self, msg: SignalKind) -> Result<(), BoxError> { match msg { SignalKind::Update(tick) => { self.update(tick).await?; } _ => {} }; Ok(()) } pub async fn update( &mut self, tick: u64, ) -> Result<(Option>, Option), BoxError> { let opt_active_positions = self.client.active_positions(&self.pair).await?; let mut events = vec![]; self.current_tick = tick; // we assume there is only ONE active position per pair match opt_active_positions { // no open positions, no events and no order forms returned None => return Ok((None, None)), Some(positions) => { // checking if there are positions open for our pair match positions .into_iter() .filter(|x| x.pair() == &self.pair) .next() { // no open positions for our pair, setting active position to none None => self.active_position = None, // applying strategy to open position and saving into struct Some(position) => { let (position_after_strategy, strategy_events, _) = self.strategy.on_new_tick(position, &self); events.extend(strategy_events); self.positions_history .insert(self.current_tick(), position_after_strategy.clone()); self.active_position = Some(position_after_strategy); } } } }; Ok(((events.is_empty().then_some(events)), None)) } pub fn position_previous_tick(&self, id: u64, tick: Option) -> Option<&Position> { let tick = match tick { Some(tick) => { if tick < 1 { 1 } else { tick } } None => self.current_tick() - 1, }; self.positions_history .get(&tick) .filter(|x| x.id() == id) .and_then(|x| Some(x)) } } /****************** * ORDERS ******************/ pub type TrackedPositionsMap = HashMap; pub struct OrderManager { // receiver: Receiver, tracked_positions: TrackedPositionsMap, pair: SymbolPair, open_orders: Vec, client: Client, strategy: Box, } impl OrderManager { const UNDERCUT_PERC: f64 = 0.005; pub fn new( // receiver: Receiver, pair: SymbolPair, client: Client, strategy: Box, ) -> Self { OrderManager { // receiver, pair, open_orders: Vec::new(), client, strategy, tracked_positions: HashMap::new(), } } pub async fn close_position(&mut self, position: &Position) -> Result<(), BoxError> { let open_order = self.tracked_positions.get(&position.id()); // checking if the position has an open order. // If so, the strategy method is called, otherwise we open // an undercut limit order at the best current price. match open_order { Some(open_order) => { self.tracked_positions = self .strategy .on_position_close(open_order, &self.tracked_positions); } None => { let current_prices = self.client.current_prices(&self.pair).await?; let closing_price = self.best_closing_price(&position, ¤t_prices)?; // submitting order let order_form = OrderForm::new( &self.pair, closing_price, position.amount().neg(), OrderKind::Limit, ); match self.client.submit_order(order_form).await { Err(e) => error!("Could not submit order: {}", e), Ok(o) => { self.tracked_positions.insert(position.id(), o); } }; } } Ok(()) } pub fn update(&self) -> Option> { unimplemented!() } pub fn best_closing_price( &self, position: &Position, price_ticker: &TradingPairTicker, ) -> Result { let price = { if position.is_short() { price_ticker.ask } else { price_ticker.bid } }; Ok(price * (1.0 - OrderManager::UNDERCUT_PERC)) } } pub struct ExchangeManager { kind: ExchangeKind, price_managers: Vec, order_managers: Vec, position_managers: Vec, dispatcher: Dispatcher, client: Client, } impl ExchangeManager { pub fn new(kind: &ExchangeKind, pairs: &Vec) -> Self { let client = Client::new(kind); let mut position_managers = Vec::new(); let mut order_managers = Vec::new(); let mut price_managers = Vec::new(); for p in pairs { position_managers.push(PositionManagerHandle::new( p.clone(), client.clone(), Box::new(TrailingStop::new()), )); order_managers.push(OrderManager::new( p.clone(), client.clone(), Box::new(FastOrderStrategy {}), )); price_managers.push(PriceManagerHandle::new(p.clone(), client.clone())); } ExchangeManager { kind: kind.clone(), position_managers, order_managers, price_managers, client, dispatcher: Dispatcher::new(), } } pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> { self.update_price_managers(tick).await?; self.update_position_managers(tick).await?; Ok(()) } async fn update_position_managers( &mut self, tick: u64, ) -> Result>, BoxError> { let mut futures: FuturesUnordered<_> = self .position_managers .iter_mut() .map(|x| x.update(tick)) .collect(); while let Some(x) = futures.next().await {} Ok(None) } async fn update_price_managers(&mut self, tick: u64) -> Result>, BoxError> { let mut futures: FuturesUnordered<_> = self .price_managers .iter_mut() .map(|x| x.update(tick)) .collect(); while let Some(x) = futures.next().await {} Ok(None) } }