order manager is an actor

This commit is contained in:
Giulio De Pasquale 2021-01-17 18:25:16 +00:00
parent 503c542a5f
commit 71273ccc78

View File

@ -293,8 +293,41 @@ impl PositionManager {
pub type TrackedPositionsMap = HashMap<u64, ExecutedOrder>; pub type TrackedPositionsMap = HashMap<u64, ExecutedOrder>;
pub struct OrderManagerHandle {
sender: Sender<SignalKind>,
}
impl OrderManagerHandle {
async fn run_order_manager(mut manager: OrderManager) {
while let Some(msg) = manager.receiver.recv().await {
manager.handle_message(msg).await.unwrap();
}
}
pub fn new(pair: SymbolPair, client: Client, strategy: Box<dyn OrderStrategy>) -> Self {
let (sender, receiver) = channel(8);
let manager = OrderManager::new(receiver, pair, client, strategy);
tokio::spawn(OrderManagerHandle::run_order_manager(manager));
Self { sender }
}
pub async fn update(&mut self, tick: u64) {
self.sender.send(SignalKind::Update(tick)).await.unwrap();
}
pub async fn close_position(&mut self, position: Position) {
self.sender
.send(SignalKind::ClosePosition(position))
.await
.unwrap();
}
}
pub struct OrderManager { pub struct OrderManager {
// receiver: Receiver<SignalKind>, receiver: Receiver<SignalKind>,
tracked_positions: TrackedPositionsMap, tracked_positions: TrackedPositionsMap,
pair: SymbolPair, pair: SymbolPair,
open_orders: Vec<ExecutedOrder>, open_orders: Vec<ExecutedOrder>,
@ -306,13 +339,13 @@ impl OrderManager {
const UNDERCUT_PERC: f64 = 0.005; const UNDERCUT_PERC: f64 = 0.005;
pub fn new( pub fn new(
// receiver: Receiver<SignalKind>, receiver: Receiver<SignalKind>,
pair: SymbolPair, pair: SymbolPair,
client: Client, client: Client,
strategy: Box<dyn OrderStrategy>, strategy: Box<dyn OrderStrategy>,
) -> Self { ) -> Self {
OrderManager { OrderManager {
// receiver, receiver,
pair, pair,
open_orders: Vec::new(), open_orders: Vec::new(),
client, client,
@ -321,6 +354,18 @@ impl OrderManager {
} }
} }
pub async fn handle_message(&mut self, msg: SignalKind) -> Result<(), BoxError> {
match msg {
SignalKind::Update(_) => {
self.update();
}
SignalKind::ClosePosition(position) => self.close_position(&position).await?,
_ => {}
};
Ok(())
}
pub async fn close_position(&mut self, position: &Position) -> Result<(), BoxError> { pub async fn close_position(&mut self, position: &Position) -> Result<(), BoxError> {
let open_order = self.tracked_positions.get(&position.id()); let open_order = self.tracked_positions.get(&position.id());
@ -381,7 +426,7 @@ impl OrderManager {
pub struct ExchangeManager { pub struct ExchangeManager {
kind: ExchangeKind, kind: ExchangeKind,
price_managers: Vec<PriceManagerHandle>, price_managers: Vec<PriceManagerHandle>,
order_managers: Vec<OrderManager>, order_managers: Vec<OrderManagerHandle>,
position_managers: Vec<PositionManagerHandle>, position_managers: Vec<PositionManagerHandle>,
dispatcher: Dispatcher, dispatcher: Dispatcher,
client: Client, client: Client,
@ -401,7 +446,7 @@ impl ExchangeManager {
client.clone(), client.clone(),
Box::new(TrailingStop::new()), Box::new(TrailingStop::new()),
)); ));
order_managers.push(OrderManager::new( order_managers.push(OrderManagerHandle::new(
p.clone(), p.clone(),
client.clone(), client.clone(),
Box::new(FastOrderStrategy {}), Box::new(FastOrderStrategy {}),