use std::collections::HashMap; use std::ops::Neg; use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; use log::{debug, error, info, trace}; use merge::Merge; use tokio::sync::mpsc::channel; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::oneshot; use tokio::time::Duration; use crate::connectors::{Client, ExchangeDetails}; use crate::currency::SymbolPair; use crate::events::{ActionMessage, ActorMessage, Event}; use crate::models::{ ActiveOrder, OrderBook, OrderForm, OrderKind, Position, PriceTicker, }; use crate::strategy::{HiddenTrailingStop, MarketEnforce, OrderStrategy, PositionStrategy}; use crate::BoxError; pub type OptionUpdate = (Option>, Option>); /****************** * PRICES ******************/ #[derive(Debug)] pub struct PriceManager { receiver: Receiver, pair: SymbolPair, prices: Vec, client: Client, } impl PriceManager { pub fn new(receiver: Receiver, pair: SymbolPair, client: Client) -> Self { PriceManager { receiver, pair, prices: Vec::new(), client, } } pub async fn handle_message(&mut self, message: ActorMessage) -> Result<(), BoxError> { if let ActionMessage::Update { tick } = message.message { let a = self.update(tick).await?; self.add_entry(a); } Ok(message .respond_to .send((None, None)) .map_err(|_| BoxError::from("Could not send message."))?) } pub fn add_entry(&mut self, entry: PriceEntry) { self.prices.push(entry); } pub async fn update(&mut self, tick: u64) -> Result { let current_prices = self.client.current_prices(&self.pair).await?.into(); Ok(PriceEntry::new( tick, current_prices, self.pair.clone(), None, )) } pub fn pair(&self) -> &SymbolPair { &self.pair } } pub struct PriceManagerHandle { sender: Sender, } impl PriceManagerHandle { async fn run_price_manager(mut manager: PriceManager) { while let Some(msg) = manager.receiver.recv().await { manager.handle_message(msg).await.unwrap(); } } pub fn new(pair: SymbolPair, client: Client) -> Self { let (sender, receiver) = channel(1); let price_manager = PriceManager::new(receiver, pair, client); tokio::spawn(PriceManagerHandle::run_price_manager(price_manager)); Self { sender } } pub async fn update(&mut self, tick: u64) -> Result { let (send, recv) = oneshot::channel(); self.sender .send(ActorMessage { message: ActionMessage::Update { tick }, respond_to: send, }) .await?; Ok(recv.await?) } } #[derive(Clone, Debug)] pub struct PriceEntry { tick: u64, pair: SymbolPair, price: PriceTicker, events: Option>, } impl PriceEntry { pub fn new( tick: u64, price: PriceTicker, pair: SymbolPair, events: Option>, ) -> Self { PriceEntry { tick, pair, price, events, } } pub fn tick(&self) -> u64 { self.tick } pub fn pair(&self) -> &SymbolPair { &self.pair } pub fn price(&self) -> PriceTicker { self.price } pub fn events(&self) -> &Option> { &self.events } } /****************** * POSITIONS ******************/ pub struct PositionManagerHandle { sender: Sender, } impl PositionManagerHandle { async fn run_position_manager(mut manager: PositionManager) { while let Some(msg) = manager.receiver.recv().await { manager.handle_message(msg).await.unwrap(); } } pub fn new(pair: SymbolPair, client: Client, strategy: Box) -> Self { let (sender, receiver) = channel(1); let manager = PositionManager::new(receiver, pair, client, strategy); tokio::spawn(PositionManagerHandle::run_position_manager(manager)); Self { sender } } pub async fn update(&mut self, tick: u64) -> Result { let (send, recv) = oneshot::channel(); self.sender .send(ActorMessage { message: ActionMessage::Update { tick }, respond_to: send, }) .await?; let response = recv.await?; Ok(response) } } #[derive(Debug)] pub struct PositionManager { receiver: Receiver, current_tick: u64, pair: SymbolPair, positions_history: HashMap, active_position: Option, client: Client, strategy: Box, } impl PositionManager { pub fn new( receiver: Receiver, pair: SymbolPair, client: Client, strategy: Box, ) -> Self { PositionManager { receiver, current_tick: 0, pair, positions_history: HashMap::new(), active_position: None, client, strategy, } } pub fn current_tick(&self) -> u64 { self.current_tick } pub async fn handle_message(&mut self, msg: ActorMessage) -> Result<(), BoxError> { let (events, messages) = match msg.message { ActionMessage::Update { tick } => self.update(tick).await?, _ => (None, None), }; Ok(msg .respond_to .send((events, messages)) .map_err(|_| BoxError::from("Could not send message."))?) } pub async fn update(&mut self, tick: u64) -> Result { trace!("\t[PositionManager] Updating {}", self.pair); let opt_active_positions = self.client.active_positions(&self.pair).await?; self.current_tick = tick; // we assume there is only ONE active position per pair match opt_active_positions { // no open positions, no events and no messages returned None => return Ok((None, None)), Some(positions) => { // checking if there are positions open for our pair match positions.into_iter().find(|x| x.pair() == &self.pair) { // no open positions for our pair, setting active position to none None => { self.active_position = None; return Ok((None, None)); } // applying strategy to open position and saving into struct Some(position) => { let mut events = None; let mut messages = None; let (pos_on_tick, events_on_tick, messages_on_tick) = self .strategy .on_tick(position, self.current_tick(), &self.positions_history); let (pos_post_tick, events_post_tick, messages_post_tick) = self .strategy .post_tick(pos_on_tick, self.current_tick(), &self.positions_history); events.merge(events_on_tick); events.merge(events_post_tick); messages.merge(messages_on_tick); messages.merge(messages_post_tick); self.positions_history .insert(self.current_tick(), pos_post_tick.clone()); self.active_position = Some(pos_post_tick); return Ok((events, messages)); } } } }; } pub fn position_previous_tick(&self, id: u64, tick: Option) -> Option<&Position> { let tick = match tick { Some(tick) => { if tick < 1 { 1 } else { tick } } None => self.current_tick() - 1, }; self.positions_history.get(&tick).filter(|x| x.id() == id) } } /****************** * ORDERS ******************/ // Position ID: Order ID pub type TrackedPositionsMap = HashMap>; pub struct OrderManagerHandle { sender: Sender, } impl OrderManagerHandle { const SLEEP_DURATION: u64 = 5; async fn run_order_manager(mut manager: OrderManager) { let mut sleep = tokio::time::interval(Duration::from_secs(OrderManagerHandle::SLEEP_DURATION)); loop { tokio::select! { opt_msg = manager.receiver.recv() => { if let Some(msg) = opt_msg { manager.handle_message(msg).await.unwrap() } }, _ = sleep.tick() => { manager.update().await.unwrap(); } } } } pub fn new(pair: SymbolPair, client: Client, strategy: Box) -> Self { let (sender, receiver) = channel(1); let manager = OrderManager::new(receiver, pair, client, strategy); tokio::spawn(OrderManagerHandle::run_order_manager(manager)); Self { sender } } pub async fn close_position(&mut self, position_id: u64) -> Result { let (send, recv) = oneshot::channel(); self.sender .send(ActorMessage { message: ActionMessage::ClosePosition { position_id }, respond_to: send, }) .await?; Ok(recv.await?) } pub async fn close_position_orders( &mut self, position_id: u64, ) -> Result { let (send, recv) = oneshot::channel(); self.sender .send(ActorMessage { message: ActionMessage::ClosePositionOrders { position_id }, respond_to: send, }) .await?; Ok(recv.await?) } pub async fn submit_order(&mut self, order_form: OrderForm) -> Result { let (send, recv) = oneshot::channel(); self.sender .send(ActorMessage { message: ActionMessage::SubmitOrder { order: order_form }, respond_to: send, }) .await?; Ok(recv.await?) } } pub struct OrderManager { receiver: Receiver, tracked_positions: TrackedPositionsMap, pair: SymbolPair, open_orders: Vec, client: Client, strategy: Box, } impl OrderManager { pub fn new( receiver: Receiver, pair: SymbolPair, client: Client, strategy: Box, ) -> Self { OrderManager { receiver, pair, open_orders: Vec::new(), client, strategy, tracked_positions: HashMap::new(), } } pub async fn handle_message(&mut self, msg: ActorMessage) -> Result<(), BoxError> { let (events, messages) = match msg.message { ActionMessage::Update { .. } => self.update().await?, ActionMessage::ClosePosition { position_id } => { self.close_position(position_id).await? } ActionMessage::ClosePositionOrders { position_id } => { self.close_position_orders(position_id).await? } ActionMessage::SubmitOrder { order } => self.submit_order(&order).await?, }; Ok(msg .respond_to .send((events, messages)) .map_err(|_| BoxError::from("Could not send message."))?) } pub async fn close_position_orders(&self, position_id: u64) -> Result { info!("Closing outstanding orders for position #{}", position_id); if let Some(position_orders) = self.tracked_positions.get(&position_id) { // retrieving open orders let open_orders = self.client.active_orders(&self.pair).await?; let position_orders: Vec<_> = position_orders .iter() .filter_map(|&x| open_orders.iter().find(|y| y.id() == x)) .collect(); for order in position_orders { match self.client.cancel_order(order).await { Ok(_) => info!("Order #{} closed successfully.", order.id()), Err(e) => error!("Could not close order #{}: {}", order.id(), e), } } } // TODO: return valid messages and events! Ok((None, None)) } pub async fn submit_order(&mut self, order_form: &OrderForm) -> Result { info!("Submiting {}", order_form.kind()); let active_order = self.client.submit_order(order_form).await?; debug!("Adding order to tracked orders."); if let Some(metadata) = order_form.metadata() { if let Some(position_id) = metadata.position_id() { match self.tracked_positions.get_mut(&position_id) { None => { self.tracked_positions .insert(position_id, vec![active_order.id()]); } Some(position_orders) => { position_orders.push(active_order.id()); } } } }; // TODO: return valid messages and events!111!!!1! Ok((None, None)) } pub async fn close_position(&mut self, position_id: u64) -> Result { info!("Closing position #{}", position_id); debug!("Retrieving open orders, positions and current prices..."); let (res_open_orders, res_order_book, res_open_positions) = tokio::join!( self.client.active_orders(&self.pair), self.client.order_book(&self.pair), self.client.active_positions(&self.pair) ); let (open_orders, order_book, open_positions) = (res_open_orders?, res_order_book?, res_open_positions?); // if there are open positions if let Some(open_positions) = open_positions { // if we find an open position with the ID we are looking for if let Some(position) = open_positions.into_iter().find(|x| x.id() == position_id) { let opt_position_order = open_orders .iter() // avoid using direct equality, using error margin instead .find(|x| { (x.order_form().amount().neg() - position.amount()).abs() < 0.0000001 }); // checking if the position has an open order. // If so, don't do anything since the order is taken care of // in the update phase. // If no order is open, send an undercut limit order at the best current price. if opt_position_order.is_none() { // No open order, undercutting best price with limit order let closing_price = self.best_closing_price(&position, &order_book); let order_form = OrderForm::new( self.pair.clone(), OrderKind::Limit { price: closing_price, }, position.platform(), position.amount().neg(), ) .with_leverage(Some(position.leverage())); info!("Submitting {} order", order_form.kind()); if let Err(e) = self.client.submit_order(&order_form).await { error!( "Could not submit {} to close position #{}: {}", order_form.kind(), position.id(), e ); return Err(e); } } } } Ok((None, None)) } pub async fn update(&mut self) -> Result { debug!("\t[OrderManager] Updating {}", self.pair); let (res_open_orders, res_order_book) = tokio::join!( self.client.active_orders(&self.pair), self.client.order_book(&self.pair) ); let (open_orders, order_book) = (res_open_orders?, res_order_book?); // retrieving open positions to check whether the positions have open orders. // we need to update our internal mapping in that case. if !open_orders.is_empty() { let open_positions = self.client.active_positions(&self.pair).await?; if let Some(positions) = open_positions { // currently, we are only trying to match orders with an amount equal to // a position amount. for position in positions { let matching_order = open_orders .iter() .find(|x| x.order_form().amount().abs() == position.amount().abs()); // if an order is found, we insert the order to our internal mapping, if not already present if let Some(matching_order) = matching_order { match self.tracked_positions.get_mut(&position.id()) { Some(position_orders) => { if !position_orders.contains(&matching_order.id()) { trace!( "Mapped order #{} to position #{}", position.id(), matching_order.id() ); position_orders.push(matching_order.id()); } } None => { trace!( "Mapped order #{} to position #{}", position.id(), matching_order.id() ); self.tracked_positions .insert(position.id(), vec![matching_order.id()]); } } } } } } for active_order in open_orders { trace!( "Found open order, calling \"{}\" strategy.", self.strategy.name() ); let (_, strat_messages) = self.strategy.on_open_order(&active_order, &order_book)?; if let Some(messages) = strat_messages { for m in messages { match m { ActionMessage::SubmitOrder { order: order_form } => { info!("Closing open order..."); info!("\tCancelling open order #{}", &active_order.id()); self.client.cancel_order(&active_order).await?; info!("\tSubmitting {}...", order_form.kind()); self.client.submit_order(&order_form).await?; info!("Done!"); } _ => { debug!( "Received unsupported message from order strategy. Unimplemented." ) } } } } } Ok((None, None)) } pub fn best_closing_price(&self, position: &Position, order_book: &OrderBook) -> f64 { let ask = order_book.lowest_ask(); let bid = order_book.highest_bid(); let avg = (bid + ask) / 2.0; let delta = (ask - bid) / 10.0; let closing_price = { if position.is_short() { bid - delta } else { ask + delta } }; if avg > 9999.0 { if position.is_short() { closing_price.ceil() } else { closing_price.floor() } } else { closing_price } } } pub struct PairManager { pair: SymbolPair, price_manager: PriceManagerHandle, order_manager: OrderManagerHandle, position_manager: PositionManagerHandle, } impl PairManager { pub fn new(pair: SymbolPair, client: Client) -> Self { Self { pair: pair.clone(), price_manager: PriceManagerHandle::new(pair.clone(), client.clone()), order_manager: OrderManagerHandle::new( pair.clone(), client.clone(), Box::new(MarketEnforce::default()), ), position_manager: PositionManagerHandle::new( pair, client, Box::new(HiddenTrailingStop::default()), ), } } pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> { let mut events = None; let mut messages = None; let (price_results, pos_results) = tokio::join!( self.price_manager.update(tick), self.position_manager.update(tick), ); let (opt_price_events, opt_price_messages) = price_results?; let (opt_pos_events, opt_pos_messages) = pos_results?; events.merge(opt_price_events); events.merge(opt_pos_events); messages.merge(opt_price_messages); messages.merge(opt_pos_messages); // TODO: to move into Handler? if let Some(messages) = messages { for m in messages { match m { ActionMessage::Update { .. } => {} ActionMessage::ClosePosition { position_id } => { self.order_manager.close_position(position_id).await?; } ActionMessage::SubmitOrder { order } => { self.order_manager.submit_order(order).await?; } ActionMessage::ClosePositionOrders { position_id } => { self.order_manager .close_position_orders(position_id) .await?; } } } } Ok(()) } } pub struct ExchangeManager { kind: ExchangeDetails, pair_managers: Vec, } impl ExchangeManager { pub fn new(kind: &ExchangeDetails, pairs: &[SymbolPair]) -> Self { let client = Client::new(kind); let pair_managers = pairs .iter() .map(|x| PairManager::new(x.clone(), client.clone())) .collect(); Self { kind: kind.clone(), pair_managers, } } pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> { let mut futures: FuturesUnordered<_> = self .pair_managers .iter_mut() .map(|x| x.update_managers(tick)) .collect(); // execute the futures while futures.next().await.is_some() {} Ok(()) } }