revamped order tracking in order manager
This commit is contained in:
parent
551ca054c6
commit
f4d7786e03
212
src/managers.rs
212
src/managers.rs
@ -1,4 +1,4 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::{HashMap, HashSet};
|
||||||
use std::ops::Neg;
|
use std::ops::Neg;
|
||||||
|
|
||||||
use futures_util::stream::FuturesUnordered;
|
use futures_util::stream::FuturesUnordered;
|
||||||
@ -384,9 +384,9 @@ impl OrderManagerHandle {
|
|||||||
|
|
||||||
pub struct OrderManager {
|
pub struct OrderManager {
|
||||||
receiver: Receiver<ActorMessage>,
|
receiver: Receiver<ActorMessage>,
|
||||||
|
orders_map: HashMap<u64, HashSet<ActiveOrder>>,
|
||||||
tracked_positions: TrackedPositionsMap,
|
tracked_positions: TrackedPositionsMap,
|
||||||
pair: SymbolPair,
|
pair: SymbolPair,
|
||||||
open_orders: Vec<ActiveOrder>,
|
|
||||||
client: Client,
|
client: Client,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -399,12 +399,90 @@ impl OrderManager {
|
|||||||
OrderManager {
|
OrderManager {
|
||||||
receiver,
|
receiver,
|
||||||
pair,
|
pair,
|
||||||
open_orders: Vec::new(),
|
|
||||||
client,
|
client,
|
||||||
tracked_positions: HashMap::new(),
|
tracked_positions: Default::default(),
|
||||||
|
orders_map: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PRIVATE METHODS
|
||||||
|
*/
|
||||||
|
|
||||||
|
fn add_to_orders_map(&mut self, position_id: u64, order: ActiveOrder) -> bool {
|
||||||
|
self.orders_map
|
||||||
|
.entry(position_id)
|
||||||
|
.or_default()
|
||||||
|
.insert(order)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn orders_from_position_id(&self, position_id: u64) -> Option<&HashSet<ActiveOrder>> {
|
||||||
|
self.orders_map.get(&position_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn all_tracked_orders(&self) -> Option<Vec<ActiveOrder>> {
|
||||||
|
let orders: Vec<_> = self.orders_map.values().flat_map(|x| x.clone()).collect();
|
||||||
|
|
||||||
|
(!orders.is_empty()).then_some(orders)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_orders_map_from_remote(&mut self) -> Result<(), BoxError> {
|
||||||
|
let (res_remote_orders, res_remote_positions) = tokio::join!(self.client.active_orders(&self.pair),
|
||||||
|
self.client.active_positions(&self.pair));
|
||||||
|
let (remote_orders, remote_positions) = (res_remote_orders?, res_remote_positions?);
|
||||||
|
|
||||||
|
match remote_positions {
|
||||||
|
// no positions open, clear internal mapping
|
||||||
|
None => { self.orders_map.clear(); }
|
||||||
|
Some(positions) => {
|
||||||
|
// retain only positions that are open remotely as well
|
||||||
|
self.orders_map.retain(|local_id, _| positions.iter().find(|r| r.id() == *local_id).is_some());
|
||||||
|
|
||||||
|
for position in positions {
|
||||||
|
// mapping tracked orders to their ids
|
||||||
|
let tracked_orders: Vec<_> = self.orders_from_position_id(position.id())
|
||||||
|
.iter()
|
||||||
|
.flat_map(|x| x
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.id()))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// adding remote order that are not in the internal mapping
|
||||||
|
for remote_order in remote_orders.iter().filter(|x| !tracked_orders.contains(&x.id())) {
|
||||||
|
// the only check to bind an active order to an open position,
|
||||||
|
// is to check for their amount which should be identical
|
||||||
|
if remote_order.order_form().amount().abs() == position.amount().abs() {
|
||||||
|
trace!("Adding order {} to internal mapping from remote.", remote_order.id());
|
||||||
|
self.add_to_orders_map(position.id(), remote_order.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// removing local orders that are not in remote
|
||||||
|
for local_orders in self.orders_map.values_mut() {
|
||||||
|
local_orders.retain(|l| remote_orders.iter().find(|r| r.id() == l.id()).is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
// clean-up empty positions in local mapping
|
||||||
|
let empty_positions_id: Vec<_> = self.orders_map
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, orders)| orders.is_empty())
|
||||||
|
.map(|(&position, _)| position)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
for position_id in empty_positions_id {
|
||||||
|
self.orders_map.remove(&position_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PUBLIC METHODS
|
||||||
|
*/
|
||||||
|
|
||||||
pub async fn handle_message(&mut self, msg: ActorMessage) -> Result<(), BoxError> {
|
pub async fn handle_message(&mut self, msg: ActorMessage) -> Result<(), BoxError> {
|
||||||
let (events, messages) = match msg.message {
|
let (events, messages) = match msg.message {
|
||||||
ActionMessage::Update { .. } => self.update().await?,
|
ActionMessage::Update { .. } => self.update().await?,
|
||||||
@ -426,14 +504,7 @@ impl OrderManager {
|
|||||||
pub async fn close_position_orders(&self, position_id: u64) -> Result<OptionUpdate, BoxError> {
|
pub async fn close_position_orders(&self, position_id: u64) -> Result<OptionUpdate, BoxError> {
|
||||||
info!("Closing outstanding orders for position #{}", position_id);
|
info!("Closing outstanding orders for position #{}", position_id);
|
||||||
|
|
||||||
if let Some(position_orders) = self.tracked_positions.get(&position_id) {
|
if let Some(position_orders) = self.orders_map.get(&position_id) {
|
||||||
// retrieving open orders
|
|
||||||
let open_orders = self.client.active_orders(&self.pair).await?;
|
|
||||||
let position_orders: Vec<_> = position_orders
|
|
||||||
.iter()
|
|
||||||
.filter_map(|&x| open_orders.iter().find(|y| y.id() == x))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
for order in position_orders {
|
for order in position_orders {
|
||||||
match self.client.cancel_order(order).await {
|
match self.client.cancel_order(order).await {
|
||||||
Ok(_) => info!("Order #{} closed successfully.", order.id()),
|
Ok(_) => info!("Order #{} closed successfully.", order.id()),
|
||||||
@ -459,18 +530,13 @@ impl OrderManager {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!("Adding order to tracked orders.");
|
|
||||||
if let Some(metadata) = order_form.metadata() {
|
if let Some(metadata) = order_form.metadata() {
|
||||||
if let Some(position_id) = metadata.position_id() {
|
if let Some(position_id) = metadata.position_id() {
|
||||||
match self.tracked_positions.get_mut(&position_id) {
|
debug!("Adding order to tracked orders.");
|
||||||
None => {
|
|
||||||
self.tracked_positions
|
if !self.add_to_orders_map(position_id, active_order) {
|
||||||
.insert(position_id, vec![active_order.id()]);
|
error!("Failed while adding order to internal mapping.");
|
||||||
}
|
};
|
||||||
Some(position_orders) => {
|
|
||||||
position_orders.push(active_order.id());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -544,47 +610,43 @@ impl OrderManager {
|
|||||||
pub async fn update(&mut self) -> Result<OptionUpdate, BoxError> {
|
pub async fn update(&mut self) -> Result<OptionUpdate, BoxError> {
|
||||||
debug!("\t[OrderManager] Updating {}", self.pair);
|
debug!("\t[OrderManager] Updating {}", self.pair);
|
||||||
|
|
||||||
let (res_open_orders, res_order_book) = tokio::join!(
|
// updating internal orders' mapping from remote
|
||||||
self.client.active_orders(&self.pair),
|
self.update_orders_map_from_remote().await?;
|
||||||
self.client.order_book(&self.pair)
|
|
||||||
);
|
|
||||||
|
|
||||||
let (open_orders, order_book) = (res_open_orders?, res_order_book?);
|
// calling strategies for the orders and collecting resulting messages
|
||||||
|
let _orders_messages: HashMap<&ActiveOrder, Vec<ActionMessage>> = HashMap::new();
|
||||||
|
|
||||||
// retrieving open positions to check whether the positions have open orders.
|
if let Some(tracked_orders) = self.all_tracked_orders() {
|
||||||
// we need to update our internal mapping in that case.
|
// since there are open orders, retrieve order book
|
||||||
if !open_orders.is_empty() {
|
let order_book = self.client.order_book(&self.pair).await?;
|
||||||
let open_positions = self.client.active_positions(&self.pair).await?;
|
|
||||||
|
|
||||||
if let Some(positions) = open_positions {
|
for active_order in tracked_orders.iter().filter(|x| x.strategy().is_some()) {
|
||||||
// currently, we are only trying to match orders with an amount equal to
|
let strategy = active_order.strategy().as_ref().unwrap();
|
||||||
// a position amount.
|
|
||||||
for position in positions {
|
|
||||||
let matching_order = open_orders
|
|
||||||
.iter()
|
|
||||||
.find(|x| x.order_form().amount().abs() == position.amount().abs());
|
|
||||||
|
|
||||||
// if an order is found, we insert the order to our internal mapping, if not already present
|
trace!(
|
||||||
if let Some(matching_order) = matching_order {
|
"Found open order with \"{}\" strategy.",
|
||||||
match self.tracked_positions.get_mut(&position.id()) {
|
strategy.name()
|
||||||
Some(position_orders) => {
|
);
|
||||||
if !position_orders.contains(&matching_order.id()) {
|
|
||||||
trace!(
|
// executing the order's strategy and collecting its messages, if any
|
||||||
"Mapped order #{} to position #{}",
|
let (_, strat_messages) = strategy.on_open_order(&active_order, &order_book)?;
|
||||||
position.id(),
|
|
||||||
matching_order.id()
|
if let Some(messages) = strat_messages {
|
||||||
);
|
for m in messages {
|
||||||
position_orders.push(matching_order.id());
|
match m {
|
||||||
}
|
ActionMessage::SubmitOrder { order: order_form } => {
|
||||||
|
info!("Closing open order...");
|
||||||
|
info!("\tCancelling open order #{}", &active_order.id());
|
||||||
|
self.client.cancel_order(&active_order).await?;
|
||||||
|
|
||||||
|
info!("\tSubmitting {}...", order_form.kind());
|
||||||
|
self.submit_order(&order_form).await?;
|
||||||
|
info!("Done!");
|
||||||
}
|
}
|
||||||
None => {
|
_ => {
|
||||||
trace!(
|
debug!(
|
||||||
"Mapped order #{} to position #{}",
|
"Received unsupported message from order strategy. Unimplemented."
|
||||||
position.id(),
|
)
|
||||||
matching_order.id()
|
|
||||||
);
|
|
||||||
self.tracked_positions
|
|
||||||
.insert(position.id(), vec![matching_order.id()]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -592,42 +654,10 @@ impl OrderManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: this syntax can be simplified
|
|
||||||
for active_order in open_orders.iter().filter(|x| x.strategy().is_some()) {
|
|
||||||
let strategy = active_order.strategy().as_ref().unwrap();
|
|
||||||
|
|
||||||
trace!(
|
|
||||||
"Found open order with \"{}\" strategy.",
|
|
||||||
strategy.name()
|
|
||||||
);
|
|
||||||
|
|
||||||
let (_, strat_messages) = strategy.on_open_order(&active_order, &order_book)?;
|
|
||||||
|
|
||||||
if let Some(messages) = strat_messages {
|
|
||||||
for m in messages {
|
|
||||||
match m {
|
|
||||||
ActionMessage::SubmitOrder { order: order_form } => {
|
|
||||||
info!("Closing open order...");
|
|
||||||
info!("\tCancelling open order #{}", &active_order.id());
|
|
||||||
self.client.cancel_order(&active_order).await?;
|
|
||||||
|
|
||||||
info!("\tSubmitting {}...", order_form.kind());
|
|
||||||
self.submit_order(&order_form).await?;
|
|
||||||
info!("Done!");
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
debug!(
|
|
||||||
"Received unsupported message from order strategy. Unimplemented."
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((None, None))
|
Ok((None, None))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub fn best_closing_price(&self, position: &Position, order_book: &OrderBook) -> f64 {
|
pub fn best_closing_price(&self, position: &Position, order_book: &OrderBook) -> f64 {
|
||||||
let ask = order_book.lowest_ask();
|
let ask = order_book.lowest_ask();
|
||||||
let bid = order_book.highest_bid();
|
let bid = order_book.highest_bid();
|
||||||
|
Loading…
Reference in New Issue
Block a user