use std::collections::{HashMap, HashSet}; use std::ops::Neg; use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; use log::{debug, error, info, trace}; use merge::Merge; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::channel; use tokio::sync::oneshot; use tokio::time::Duration; use crate::BoxError; use crate::connectors::{Client, ExchangeDetails}; use crate::currency::SymbolPair; use crate::events::{ActionMessage, ActorMessage, Event}; use crate::models::{ActiveOrder, OrderBook, OrderForm, OrderKind, OrderMetadata, Position, PriceTicker}; use crate::strategy::{TrailingStop, MarketEnforce, PositionStrategy}; pub type OptionUpdate = (Option>, Option>); /****************** * PRICES ******************/ #[derive(Debug)] pub struct PriceManager { receiver: Receiver, pair: SymbolPair, prices: Vec, client: Client, } impl PriceManager { pub fn new(receiver: Receiver, pair: SymbolPair, client: Client) -> Self { PriceManager { receiver, pair, prices: Vec::new(), client, } } pub async fn handle_message(&mut self, message: ActorMessage) -> Result<(), BoxError> { if let ActionMessage::Update { tick } = message.message { let a = self.update(tick).await?; self.add_entry(a); } Ok(message .respond_to .send((None, None)) .map_err(|_| BoxError::from("Could not send message."))?) } pub fn add_entry(&mut self, entry: PriceEntry) { self.prices.push(entry); } pub async fn update(&mut self, tick: u64) -> Result { let current_prices = self.client.current_prices(&self.pair).await?.into(); Ok(PriceEntry::new( tick, current_prices, self.pair.clone(), None, )) } pub fn pair(&self) -> &SymbolPair { &self.pair } } pub struct PriceManagerHandle { sender: Sender, } impl PriceManagerHandle { async fn run_price_manager(mut manager: PriceManager) { while let Some(msg) = manager.receiver.recv().await { manager.handle_message(msg).await.unwrap(); } } pub fn new(pair: SymbolPair, client: Client) -> Self { let (sender, receiver) = channel(1); let price_manager = PriceManager::new(receiver, pair, client); tokio::spawn(PriceManagerHandle::run_price_manager(price_manager)); Self { sender } } pub async fn update(&mut self, tick: u64) -> Result { let (send, recv) = oneshot::channel(); self.sender .send(ActorMessage { message: ActionMessage::Update { tick }, respond_to: send, }) .await?; Ok(recv.await?) } } #[derive(Clone, Debug)] pub struct PriceEntry { tick: u64, pair: SymbolPair, price: PriceTicker, events: Option>, } impl PriceEntry { pub fn new( tick: u64, price: PriceTicker, pair: SymbolPair, events: Option>, ) -> Self { PriceEntry { tick, pair, price, events, } } pub fn tick(&self) -> u64 { self.tick } pub fn pair(&self) -> &SymbolPair { &self.pair } pub fn price(&self) -> PriceTicker { self.price } pub fn events(&self) -> &Option> { &self.events } } /****************** * POSITIONS ******************/ pub struct PositionManagerHandle { sender: Sender, } impl PositionManagerHandle { async fn run_position_manager(mut manager: PositionManager) { while let Some(msg) = manager.receiver.recv().await { manager.handle_message(msg).await.unwrap(); } } pub fn new(pair: SymbolPair, client: Client, strategy: Box) -> Self { let (sender, receiver) = channel(1); let manager = PositionManager::new(receiver, pair, client, strategy); tokio::spawn(PositionManagerHandle::run_position_manager(manager)); Self { sender } } pub async fn update(&mut self, tick: u64) -> Result { let (send, recv) = oneshot::channel(); self.sender .send(ActorMessage { message: ActionMessage::Update { tick }, respond_to: send, }) .await?; let response = recv.await?; Ok(response) } } #[derive(Debug)] pub struct PositionManager { receiver: Receiver, current_tick: u64, pair: SymbolPair, positions_history: HashMap, active_position: Option, client: Client, strategy: Box, } impl PositionManager { pub fn new( receiver: Receiver, pair: SymbolPair, client: Client, strategy: Box, ) -> Self { PositionManager { receiver, current_tick: 0, pair, positions_history: HashMap::new(), active_position: None, client, strategy, } } pub fn current_tick(&self) -> u64 { self.current_tick } pub async fn handle_message(&mut self, msg: ActorMessage) -> Result<(), BoxError> { let (events, messages) = match msg.message { ActionMessage::Update { tick } => self.update(tick).await?, _ => (None, None), }; Ok(msg .respond_to .send((events, messages)) .map_err(|_| BoxError::from("Could not send message."))?) } pub async fn update(&mut self, tick: u64) -> Result { trace!("\t[PositionManager] Updating {}", self.pair); self.current_tick = tick; let (fees, opt_active_positions) = tokio::join!(self.client.trading_fees(),self.client.active_positions(&self.pair)); let (fees, opt_active_positions) = (fees?, opt_active_positions?); // we assume there is only ONE active position per pair match opt_active_positions { // no open positions, no events and no messages returned None => return Ok((None, None)), Some(positions) => { // checking if there are positions open for our pair match positions.into_iter().find(|x| x.pair() == &self.pair) { // no open positions for our pair, setting active position to none None => { self.active_position = None; return Ok((None, None)); } // applying strategy to open position and saving into struct Some(position) => { let mut events = None; let mut messages = None; let (pos_on_tick, events_on_tick, messages_on_tick) = self .strategy .on_tick(position, self.current_tick(), &self.positions_history, &fees); let (pos_post_tick, events_post_tick, messages_post_tick) = self .strategy .post_tick(pos_on_tick, self.current_tick(), &self.positions_history, &fees); events.merge(events_on_tick); events.merge(events_post_tick); messages.merge(messages_on_tick); messages.merge(messages_post_tick); self.positions_history .insert(self.current_tick(), pos_post_tick.clone()); self.active_position = Some(pos_post_tick); return Ok((events, messages)); } } } }; } } /****************** * ORDERS ******************/ pub struct OrderManagerHandle { sender: Sender, } impl OrderManagerHandle { const SLEEP_DURATION: u64 = 5; async fn run_order_manager(mut manager: OrderManager) { let mut sleep = tokio::time::interval(Duration::from_secs(OrderManagerHandle::SLEEP_DURATION)); loop { tokio::select! { opt_msg = manager.receiver.recv() => { if let Some(msg) = opt_msg { manager.handle_message(msg).await.unwrap() } }, _ = sleep.tick() => { manager.update().await.unwrap(); } } } } pub fn new(pair: SymbolPair, client: Client) -> Self { let (sender, receiver) = channel(1); let manager = OrderManager::new(receiver, pair, client); tokio::spawn(OrderManagerHandle::run_order_manager(manager)); Self { sender } } pub async fn close_position(&mut self, position_id: u64) -> Result { let (send, recv) = oneshot::channel(); self.sender .send(ActorMessage { message: ActionMessage::ClosePosition { position_id }, respond_to: send, }) .await?; Ok(recv.await?) } pub async fn close_position_orders( &mut self, position_id: u64, ) -> Result { let (send, recv) = oneshot::channel(); self.sender .send(ActorMessage { message: ActionMessage::ClosePositionOrders { position_id }, respond_to: send, }) .await?; Ok(recv.await?) } pub async fn submit_order(&mut self, order_form: OrderForm) -> Result { let (send, recv) = oneshot::channel(); self.sender .send(ActorMessage { message: ActionMessage::SubmitOrder { order: order_form }, respond_to: send, }) .await?; Ok(recv.await?) } } pub struct OrderManager { receiver: Receiver, orders_map: HashMap>, pair: SymbolPair, client: Client, } impl OrderManager { pub fn new( receiver: Receiver, pair: SymbolPair, client: Client, ) -> Self { OrderManager { receiver, pair, client, orders_map: Default::default(), } } /* * PRIVATE METHODS */ fn add_to_orders_map(&mut self, position_id: u64, order: ActiveOrder) -> bool { self.orders_map .entry(position_id) .or_default() .insert(order) } fn orders_from_position_id(&self, position_id: u64) -> Option<&HashSet> { self.orders_map.get(&position_id) } fn all_tracked_orders(&self) -> Option> { let orders: Vec<_> = self.orders_map.values().flat_map(|x| x.clone()).collect(); (!orders.is_empty()).then_some(orders) } async fn update_orders_map_from_remote(&mut self) -> Result<(), BoxError> { let (res_remote_orders, res_remote_positions) = tokio::join!(self.client.active_orders(&self.pair), self.client.active_positions(&self.pair)); let (remote_orders, remote_positions) = (res_remote_orders?, res_remote_positions?); match remote_positions { // no positions open, clear internal mapping None => { self.orders_map.clear(); } Some(positions) => { // retain only positions that are open remotely as well self.orders_map.retain(|local_id, _| positions.iter().find(|r| r.id() == *local_id).is_some()); for position in positions { // mapping tracked orders to their ids let tracked_orders: Vec<_> = self.orders_from_position_id(position.id()) .iter() .flat_map(|x| x .iter() .map(|x| x.id())) .collect(); // adding remote order that are not in the internal mapping for remote_order in remote_orders.iter().filter(|x| !tracked_orders.contains(&x.id())) { // the only check to bind an active order to an open position, // is to check for their amount which should be identical if remote_order.order_form().amount().abs() == position.amount().abs() { trace!("Adding order {} to internal mapping from remote.", remote_order.id()); self.add_to_orders_map(position.id(), remote_order.clone()); } } // removing local orders that are not in remote for local_orders in self.orders_map.values_mut() { local_orders.retain(|l| remote_orders.iter().find(|r| r.id() == l.id()).is_some()); } // clean-up empty positions in local mapping let empty_positions_id: Vec<_> = self.orders_map .iter() .filter(|(_, orders)| orders.is_empty()) .map(|(&position, _)| position) .collect(); for position_id in empty_positions_id { self.orders_map.remove(&position_id); } } } } Ok(()) } /* * PUBLIC METHODS */ pub async fn handle_message(&mut self, msg: ActorMessage) -> Result<(), BoxError> { let (events, messages) = match msg.message { ActionMessage::Update { .. } => self.update().await?, ActionMessage::ClosePosition { position_id } => { self.close_position(position_id).await? } ActionMessage::ClosePositionOrders { position_id } => { self.close_position_orders(position_id).await? } ActionMessage::SubmitOrder { order } => self.submit_order(&order).await?, }; Ok(msg .respond_to .send((events, messages)) .map_err(|_| BoxError::from("Could not send message."))?) } pub async fn close_position_orders(&self, position_id: u64) -> Result { info!("Closing outstanding orders for position #{}", position_id); if let Some(position_orders) = self.orders_map.get(&position_id) { for order in position_orders { match self.client.cancel_order(order).await { Ok(_) => info!("Order #{} closed successfully.", order.id()), Err(e) => error!("Could not close order #{}: {}", order.id(), e), } } } // TODO: return valid messages and events! Ok((None, None)) } pub async fn submit_order(&mut self, order_form: &OrderForm) -> Result { info!("Submitting order: {}", order_form.kind()); // adding strategy to order, if present in the metadata let active_order = { if let Some(metadata) = order_form.metadata() { // TODO: this seems extremely dirty. Double check! self.client.submit_order(order_form).await?.with_strategy(metadata.cloned_strategy()) } else { self.client.submit_order(order_form).await? } }; if let Some(metadata) = order_form.metadata() { if let Some(position_id) = metadata.position_id() { debug!("Adding order to tracked orders."); if !self.add_to_orders_map(position_id, active_order) { error!("Failed while adding order to internal mapping."); }; } }; // TODO: return valid messages and events!111!!!1! Ok((None, None)) } pub async fn close_position(&mut self, position_id: u64) -> Result { info!("Closing position #{}", position_id); debug!("Retrieving open orders, positions and current prices..."); let (res_open_orders, res_order_book, res_open_positions) = tokio::join!( self.client.active_orders(&self.pair), self.client.order_book(&self.pair), self.client.active_positions(&self.pair) ); let (open_orders, order_book, open_positions) = (res_open_orders?, res_order_book?, res_open_positions?); // if there are open positions if let Some(open_positions) = open_positions { // if we find an open position with the ID we are looking for if let Some(position) = open_positions.into_iter().find(|x| x.id() == position_id) { let opt_position_order = open_orders .iter() // avoid using direct equality, using error margin instead .find(|x| { (x.order_form().amount().neg() - position.amount()).abs() < 0.0000001 }); // checking if the position has an open order. // If so, don't do anything since the order is taken care of // in the update phase. // If no order is open, send an undercut limit order at the best current price. if opt_position_order.is_none() { // No open order, undercutting best price with limit order let closing_price = self.best_closing_price(&position, &order_book); let order_form = OrderForm::new( self.pair.clone(), OrderKind::Limit { price: closing_price, }, position.platform(), position.amount().neg(), ) .with_leverage(Some(position.leverage())) .with_metadata(Some(OrderMetadata::new() .with_strategy(Some(Box::new(MarketEnforce::default()))) .with_position_id(Some(position.id()))) ); // submitting order if let Err(e) = self.submit_order(&order_form).await { error!( "Could not submit {} to close position #{}: {}", order_form.kind(), position.id(), e ); return Err(e); } } } } Ok((None, None)) } pub async fn update(&mut self) -> Result { debug!("\t[OrderManager] Updating {}", self.pair); // updating internal orders' mapping from remote self.update_orders_map_from_remote().await?; // calling strategies for the orders and collecting resulting messages let _orders_messages: HashMap<&ActiveOrder, Vec> = HashMap::new(); if let Some(tracked_orders) = self.all_tracked_orders() { // since there are open orders, retrieve order book let order_book = self.client.order_book(&self.pair).await?; for active_order in tracked_orders.iter().filter(|x| x.strategy().is_some()) { let strategy = active_order.strategy().as_ref().unwrap(); trace!( "Found open order with \"{}\" strategy.", strategy.name() ); // executing the order's strategy and collecting its messages, if any let (_, strat_messages) = strategy.on_open_order(&active_order, &order_book)?; if let Some(messages) = strat_messages { for m in messages { match m { ActionMessage::SubmitOrder { order: order_form } => { info!("Closing open order..."); info!("\tCancelling open order #{}", &active_order.id()); self.client.cancel_order(&active_order).await?; info!("\tSubmitting {}...", order_form.kind()); self.submit_order(&order_form).await?; info!("Done!"); } _ => { debug!( "Received unsupported message from order strategy. Unimplemented." ) } } } } } } Ok((None, None)) } pub fn best_closing_price(&self, position: &Position, order_book: &OrderBook) -> f64 { let ask = order_book.lowest_ask(); let bid = order_book.highest_bid(); let avg = (bid + ask) / 2.0; let delta = (ask - bid) / 10.0; let closing_price = { if position.is_short() { bid - delta } else { ask + delta } }; if avg > 9999.0 { if position.is_short() { closing_price.ceil() } else { closing_price.floor() } } else { closing_price } } } pub struct PairManager { pair: SymbolPair, price_manager: PriceManagerHandle, order_manager: OrderManagerHandle, position_manager: PositionManagerHandle, } impl PairManager { pub fn new(pair: SymbolPair, client: Client) -> Self { Self { pair: pair.clone(), price_manager: PriceManagerHandle::new(pair.clone(), client.clone()), order_manager: OrderManagerHandle::new( pair.clone(), client.clone(), ), position_manager: PositionManagerHandle::new( pair, client, Box::new(TrailingStop::default()), ), } } pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> { let mut events = None; let mut messages = None; let (price_results, pos_results) = tokio::join!( self.price_manager.update(tick), self.position_manager.update(tick), ); let (opt_price_events, opt_price_messages) = price_results?; let (opt_pos_events, opt_pos_messages) = pos_results?; events.merge(opt_price_events); events.merge(opt_pos_events); messages.merge(opt_price_messages); messages.merge(opt_pos_messages); // TODO: to move into Handler? if let Some(messages) = messages { for m in messages { match m { ActionMessage::Update { .. } => {} ActionMessage::ClosePosition { position_id } => { self.order_manager.close_position(position_id).await?; } ActionMessage::SubmitOrder { order } => { self.order_manager.submit_order(order).await?; } ActionMessage::ClosePositionOrders { position_id } => { self.order_manager .close_position_orders(position_id) .await?; } } } } Ok(()) } } pub struct ExchangeManager { kind: ExchangeDetails, pair_managers: Vec, } impl ExchangeManager { pub fn new(kind: &ExchangeDetails, pairs: &[SymbolPair]) -> Self { let client = Client::new(kind); let pair_managers = pairs .iter() .map(|x| PairManager::new(x.clone(), client.clone())) .collect(); Self { kind: kind.clone(), pair_managers, } } pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> { let mut futures: FuturesUnordered<_> = self .pair_managers .iter_mut() .map(|x| x.update_managers(tick)) .collect(); // execute the futures while futures.next().await.is_some() {} Ok(()) } }