diff --git a/src/managers.rs b/src/managers.rs index 4101d14..658c48c 100644 --- a/src/managers.rs +++ b/src/managers.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::ops::Neg; use futures_util::stream::FuturesUnordered; @@ -384,9 +384,9 @@ impl OrderManagerHandle { pub struct OrderManager { receiver: Receiver, + orders_map: HashMap>, tracked_positions: TrackedPositionsMap, pair: SymbolPair, - open_orders: Vec, client: Client, } @@ -399,12 +399,90 @@ impl OrderManager { OrderManager { receiver, pair, - open_orders: Vec::new(), client, - tracked_positions: HashMap::new(), + tracked_positions: Default::default(), + orders_map: Default::default(), } } + /* + * PRIVATE METHODS + */ + + fn add_to_orders_map(&mut self, position_id: u64, order: ActiveOrder) -> bool { + self.orders_map + .entry(position_id) + .or_default() + .insert(order) + } + + fn orders_from_position_id(&self, position_id: u64) -> Option<&HashSet> { + self.orders_map.get(&position_id) + } + + fn all_tracked_orders(&self) -> Option> { + let orders: Vec<_> = self.orders_map.values().flat_map(|x| x.clone()).collect(); + + (!orders.is_empty()).then_some(orders) + } + + async fn update_orders_map_from_remote(&mut self) -> Result<(), BoxError> { + let (res_remote_orders, res_remote_positions) = tokio::join!(self.client.active_orders(&self.pair), + self.client.active_positions(&self.pair)); + let (remote_orders, remote_positions) = (res_remote_orders?, res_remote_positions?); + + match remote_positions { + // no positions open, clear internal mapping + None => { self.orders_map.clear(); } + Some(positions) => { + // retain only positions that are open remotely as well + self.orders_map.retain(|local_id, _| positions.iter().find(|r| r.id() == *local_id).is_some()); + + for position in positions { + // mapping tracked orders to their ids + let tracked_orders: Vec<_> = self.orders_from_position_id(position.id()) + .iter() + .flat_map(|x| x + .iter() + .map(|x| x.id())) + .collect(); + + // adding remote order that are not in the internal mapping + for remote_order in remote_orders.iter().filter(|x| !tracked_orders.contains(&x.id())) { + // the only check to bind an active order to an open position, + // is to check for their amount which should be identical + if remote_order.order_form().amount().abs() == position.amount().abs() { + trace!("Adding order {} to internal mapping from remote.", remote_order.id()); + self.add_to_orders_map(position.id(), remote_order.clone()); + } + } + + // removing local orders that are not in remote + for local_orders in self.orders_map.values_mut() { + local_orders.retain(|l| remote_orders.iter().find(|r| r.id() == l.id()).is_some()); + } + + // clean-up empty positions in local mapping + let empty_positions_id: Vec<_> = self.orders_map + .iter() + .filter(|(_, orders)| orders.is_empty()) + .map(|(&position, _)| position) + .collect(); + + for position_id in empty_positions_id { + self.orders_map.remove(&position_id); + } + } + } + } + + Ok(()) + } + + /* + * PUBLIC METHODS + */ + pub async fn handle_message(&mut self, msg: ActorMessage) -> Result<(), BoxError> { let (events, messages) = match msg.message { ActionMessage::Update { .. } => self.update().await?, @@ -426,14 +504,7 @@ impl OrderManager { pub async fn close_position_orders(&self, position_id: u64) -> Result { info!("Closing outstanding orders for position #{}", position_id); - if let Some(position_orders) = self.tracked_positions.get(&position_id) { - // retrieving open orders - let open_orders = self.client.active_orders(&self.pair).await?; - let position_orders: Vec<_> = position_orders - .iter() - .filter_map(|&x| open_orders.iter().find(|y| y.id() == x)) - .collect(); - + if let Some(position_orders) = self.orders_map.get(&position_id) { for order in position_orders { match self.client.cancel_order(order).await { Ok(_) => info!("Order #{} closed successfully.", order.id()), @@ -459,18 +530,13 @@ impl OrderManager { } }; - debug!("Adding order to tracked orders."); if let Some(metadata) = order_form.metadata() { if let Some(position_id) = metadata.position_id() { - match self.tracked_positions.get_mut(&position_id) { - None => { - self.tracked_positions - .insert(position_id, vec![active_order.id()]); - } - Some(position_orders) => { - position_orders.push(active_order.id()); - } - } + debug!("Adding order to tracked orders."); + + if !self.add_to_orders_map(position_id, active_order) { + error!("Failed while adding order to internal mapping."); + }; } }; @@ -544,47 +610,43 @@ impl OrderManager { pub async fn update(&mut self) -> Result { debug!("\t[OrderManager] Updating {}", self.pair); - let (res_open_orders, res_order_book) = tokio::join!( - self.client.active_orders(&self.pair), - self.client.order_book(&self.pair) - ); + // updating internal orders' mapping from remote + self.update_orders_map_from_remote().await?; - let (open_orders, order_book) = (res_open_orders?, res_order_book?); + // calling strategies for the orders and collecting resulting messages + let _orders_messages: HashMap<&ActiveOrder, Vec> = HashMap::new(); - // retrieving open positions to check whether the positions have open orders. - // we need to update our internal mapping in that case. - if !open_orders.is_empty() { - let open_positions = self.client.active_positions(&self.pair).await?; + if let Some(tracked_orders) = self.all_tracked_orders() { + // since there are open orders, retrieve order book + let order_book = self.client.order_book(&self.pair).await?; - if let Some(positions) = open_positions { - // currently, we are only trying to match orders with an amount equal to - // a position amount. - for position in positions { - let matching_order = open_orders - .iter() - .find(|x| x.order_form().amount().abs() == position.amount().abs()); + for active_order in tracked_orders.iter().filter(|x| x.strategy().is_some()) { + let strategy = active_order.strategy().as_ref().unwrap(); - // if an order is found, we insert the order to our internal mapping, if not already present - if let Some(matching_order) = matching_order { - match self.tracked_positions.get_mut(&position.id()) { - Some(position_orders) => { - if !position_orders.contains(&matching_order.id()) { - trace!( - "Mapped order #{} to position #{}", - position.id(), - matching_order.id() - ); - position_orders.push(matching_order.id()); - } + trace!( + "Found open order with \"{}\" strategy.", + strategy.name() + ); + + // executing the order's strategy and collecting its messages, if any + let (_, strat_messages) = strategy.on_open_order(&active_order, &order_book)?; + + if let Some(messages) = strat_messages { + for m in messages { + match m { + ActionMessage::SubmitOrder { order: order_form } => { + info!("Closing open order..."); + info!("\tCancelling open order #{}", &active_order.id()); + self.client.cancel_order(&active_order).await?; + + info!("\tSubmitting {}...", order_form.kind()); + self.submit_order(&order_form).await?; + info!("Done!"); } - None => { - trace!( - "Mapped order #{} to position #{}", - position.id(), - matching_order.id() - ); - self.tracked_positions - .insert(position.id(), vec![matching_order.id()]); + _ => { + debug!( + "Received unsupported message from order strategy. Unimplemented." + ) } } } @@ -592,42 +654,10 @@ impl OrderManager { } } - // TODO: this syntax can be simplified - for active_order in open_orders.iter().filter(|x| x.strategy().is_some()) { - let strategy = active_order.strategy().as_ref().unwrap(); - - trace!( - "Found open order with \"{}\" strategy.", - strategy.name() - ); - - let (_, strat_messages) = strategy.on_open_order(&active_order, &order_book)?; - - if let Some(messages) = strat_messages { - for m in messages { - match m { - ActionMessage::SubmitOrder { order: order_form } => { - info!("Closing open order..."); - info!("\tCancelling open order #{}", &active_order.id()); - self.client.cancel_order(&active_order).await?; - - info!("\tSubmitting {}...", order_form.kind()); - self.submit_order(&order_form).await?; - info!("Done!"); - } - _ => { - debug!( - "Received unsupported message from order strategy. Unimplemented." - ) - } - } - } - } - } - Ok((None, None)) } + pub fn best_closing_price(&self, position: &Position, order_book: &OrderBook) -> f64 { let ask = order_book.lowest_ask(); let bid = order_book.highest_bid();