refactored SignalKind into Message and ActorMessage

This commit is contained in:
Giulio De Pasquale 2021-01-18 11:54:40 +00:00
parent 3512dce35b
commit 8283ecde60
3 changed files with 208 additions and 162 deletions

View File

@ -3,17 +3,24 @@ use std::future::Future;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::managers::{OrderManager, PositionManager, PriceManager}; use crate::managers::{OptionUpdate, OrderManager, PositionManager, PriceManager};
use crate::models::{Position, PositionProfitState}; use crate::models::{Position, PositionProfitState};
use tokio::sync::oneshot;
#[derive(Clone, Debug, Hash, PartialEq, Eq)] #[derive(Debug)]
pub enum SignalKind { pub struct ActorMessage {
Update(u64), pub(crate) message: Message,
ClosePosition(Position), pub(crate) respond_to: oneshot::Sender<OptionUpdate>,
}
#[derive(Debug)]
pub enum Message {
Update { tick: u64 },
ClosePosition { position: Position },
OpenPosition, OpenPosition,
} }
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct EventMetadata { pub struct EventMetadata {
position_id: Option<u64>, position_id: Option<u64>,
order_id: Option<u64>, order_id: Option<u64>,
@ -44,7 +51,7 @@ pub enum EventKind {
Any, Any,
} }
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
pub struct Event { pub struct Event {
kind: EventKind, kind: EventKind,
tick: u64, tick: u64,
@ -74,113 +81,113 @@ impl Event {
self.metadata self.metadata
} }
} }
//
pub struct Dispatcher { // pub struct Dispatcher {
event_handlers: HashMap<EventKind, Vec<Box<dyn Fn(&Event, &PriceManager) -> JoinHandle<()>>>>, // event_handlers: HashMap<EventKind, Vec<Box<dyn Fn(&Event, &PriceManager) -> JoinHandle<()>>>>,
profit_state_handlers: // profit_state_handlers:
HashMap<PositionProfitState, Vec<Box<dyn Fn(&Position, &PriceManager) -> JoinHandle<()>>>>, // HashMap<PositionProfitState, Vec<Box<dyn Fn(&Position, &PriceManager) -> JoinHandle<()>>>>,
signal_handlers: HashMap<SignalKind, Vec<Box<dyn Fn(&SignalKind) -> JoinHandle<()>>>>, // signal_handlers: HashMap<SignalKind, Vec<Box<dyn Fn(&SignalKind) -> JoinHandle<()>>>>,
//
on_any_event_handlers: Vec<Box<dyn Fn(&Event, &PriceManager) -> JoinHandle<()>>>, // on_any_event_handlers: Vec<Box<dyn Fn(&Event, &PriceManager) -> JoinHandle<()>>>,
on_any_profit_state_handlers: Vec<Box<dyn Fn(&Position, &PriceManager) -> JoinHandle<()>>>, // on_any_profit_state_handlers: Vec<Box<dyn Fn(&Position, &PriceManager) -> JoinHandle<()>>>,
} // }
//
impl Dispatcher { // impl Dispatcher {
pub fn new() -> Self { // pub fn new() -> Self {
Dispatcher { // Dispatcher {
event_handlers: HashMap::new(), // event_handlers: HashMap::new(),
profit_state_handlers: HashMap::new(), // profit_state_handlers: HashMap::new(),
signal_handlers: HashMap::new(), // signal_handlers: HashMap::new(),
on_any_event_handlers: Vec::new(), // on_any_event_handlers: Vec::new(),
on_any_profit_state_handlers: Vec::new(), // on_any_profit_state_handlers: Vec::new(),
} // }
} // }
//
pub fn call_signal_handlers(&self, signal: &SignalKind) { // pub fn call_signal_handlers(&self, signal: &SignalKind) {
if let Some(functions) = self.signal_handlers.get(&signal) { // if let Some(functions) = self.signal_handlers.get(&signal) {
for f in functions { // for f in functions {
f(signal); // f(signal);
} // }
} // }
} // }
//
pub fn call_event_handlers(&self, event: &Event, status: &PriceManager) { // pub fn call_event_handlers(&self, event: &Event, status: &PriceManager) {
if let Some(functions) = self.event_handlers.get(&event.kind()) { // if let Some(functions) = self.event_handlers.get(&event.kind()) {
for f in functions { // for f in functions {
f(event, status); // f(event, status);
} // }
} // }
//
for f in &self.on_any_event_handlers { // for f in &self.on_any_event_handlers {
f(event, status); // f(event, status);
} // }
} // }
//
pub fn call_position_state_handlers(&self, position: &Position, status: &PriceManager) { // pub fn call_position_state_handlers(&self, position: &Position, status: &PriceManager) {
if let Some(profit_state) = &position.profit_state() { // if let Some(profit_state) = &position.profit_state() {
if let Some(functions) = self.profit_state_handlers.get(profit_state) { // if let Some(functions) = self.profit_state_handlers.get(profit_state) {
for f in functions { // for f in functions {
f(position, status); // f(position, status);
} // }
} // }
} // }
//
for f in &self.on_any_profit_state_handlers { // for f in &self.on_any_profit_state_handlers {
f(position, status); // f(position, status);
} // }
} // }
//
pub fn register_event_handler<F: 'static, Fut: 'static>(&mut self, event: EventKind, f: F) // pub fn register_event_handler<F: 'static, Fut: 'static>(&mut self, event: EventKind, f: F)
where // where
F: Fn(&Event, &PriceManager) -> Fut, // F: Fn(&Event, &PriceManager) -> Fut,
Fut: Future<Output = ()> + Send, // Fut: Future<Output = ()> + Send,
{ // {
match event { // match event {
EventKind::Any => self // EventKind::Any => self
.on_any_event_handlers // .on_any_event_handlers
.push(Box::new(move |e, s| tokio::spawn(f(&e, s)))), // .push(Box::new(move |e, s| tokio::spawn(f(&e, s)))),
_ => self // _ => self
.event_handlers // .event_handlers
.entry(event) // .entry(event)
.or_default() // .or_default()
.push(Box::new(move |e, s| tokio::spawn(f(&e, s)))), // .push(Box::new(move |e, s| tokio::spawn(f(&e, s)))),
} // }
} // }
//
pub fn register_positionstate_handler<F: 'static, Fut: 'static>( // pub fn register_positionstate_handler<F: 'static, Fut: 'static>(
&mut self, // &mut self,
state: PositionProfitState, // state: PositionProfitState,
f: F, // f: F,
) where // ) where
F: Fn(&Position, &PriceManager) -> Fut, // F: Fn(&Position, &PriceManager) -> Fut,
Fut: Future<Output = ()> + Send, // Fut: Future<Output = ()> + Send,
{ // {
match state { // match state {
// PositionProfitState::Any => self // // PositionProfitState::Any => self
// .on_any_position_state_handlers // // .on_any_position_state_handlers
// .push(Box::new(move |p, s| tokio::spawn(f(&p, s)))), // // .push(Box::new(move |p, s| tokio::spawn(f(&p, s)))),
_ => self // _ => self
.profit_state_handlers // .profit_state_handlers
.entry(state) // .entry(state)
.or_default() // .or_default()
.push(Box::new(move |p, s| tokio::spawn(f(&p, s)))), // .push(Box::new(move |p, s| tokio::spawn(f(&p, s)))),
} // }
} // }
//
pub fn register_signal_handler<F: 'static, Fut: 'static>(&mut self, signal: SignalKind, f: F) // pub fn register_signal_handler<F: 'static, Fut: 'static>(&mut self, signal: SignalKind, f: F)
where // where
F: Fn(&SignalKind) -> Fut, // F: Fn(&SignalKind) -> Fut,
Fut: Future<Output = ()> + Send, // Fut: Future<Output = ()> + Send,
{ // {
match signal { // match signal {
// PositionProfitState::Any => self // // PositionProfitState::Any => self
// .on_any_position_state_handlers // // .on_any_position_state_handlers
// .push(Box::new(move |p, s| tokio::spawn(f(&p, s)))), // // .push(Box::new(move |p, s| tokio::spawn(f(&p, s)))),
_ => self // _ => self
.signal_handlers // .signal_handlers
.entry(signal) // .entry(signal)
.or_default() // .or_default()
.push(Box::new(move |s| tokio::spawn(f(s)))), // .push(Box::new(move |s| tokio::spawn(f(s)))),
} // }
} // }
} // }

View File

@ -8,15 +8,16 @@ use log::{debug, error, info};
use tokio::signal::unix::Signal; use tokio::signal::unix::Signal;
use tokio::sync::mpsc::channel; use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::oneshot;
use crate::connectors::{Client, ExchangeKind}; use crate::connectors::{Client, ExchangeKind};
use crate::currency::SymbolPair; use crate::currency::SymbolPair;
use crate::events::{Dispatcher, Event, SignalKind}; use crate::events::{ActorMessage, Event, Message};
use crate::models::{ExecutedOrder, OrderForm, OrderKind, Position, PriceTicker}; use crate::models::{ExecutedOrder, OrderForm, OrderKind, Position, PriceTicker};
use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy, TrailingStop}; use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy, TrailingStop};
use crate::BoxError; use crate::BoxError;
type OptionUpdate = (Option<Vec<Event>>, Option<Vec<SignalKind>>); pub type OptionUpdate = (Option<Vec<Event>>, Option<Vec<Message>>);
pub struct EventManager { pub struct EventManager {
events: Vec<Event>, events: Vec<Event>,
@ -28,14 +29,14 @@ pub struct EventManager {
#[derive(Debug)] #[derive(Debug)]
pub struct PriceManager { pub struct PriceManager {
receiver: Receiver<SignalKind>, receiver: Receiver<ActorMessage>,
pair: SymbolPair, pair: SymbolPair,
prices: Vec<PriceEntry>, prices: Vec<PriceEntry>,
client: Client, client: Client,
} }
pub struct PriceManagerHandle { pub struct PriceManagerHandle {
sender: Sender<SignalKind>, sender: Sender<ActorMessage>,
} }
impl PriceManagerHandle { impl PriceManagerHandle {
@ -54,13 +55,22 @@ impl PriceManagerHandle {
Self { sender } Self { sender }
} }
pub async fn update(&mut self, tick: u64) { pub async fn update(&mut self, tick: u64) -> Result<OptionUpdate, BoxError> {
self.sender.send(SignalKind::Update(tick)).await.unwrap(); let (send, recv) = oneshot::channel();
self.sender
.send(ActorMessage {
message: Message::Update { tick },
respond_to: send,
})
.await?;
Ok(recv.await?)
} }
} }
impl PriceManager { impl PriceManager {
pub fn new(receiver: Receiver<SignalKind>, pair: SymbolPair, client: Client) -> Self { pub fn new(receiver: Receiver<ActorMessage>, pair: SymbolPair, client: Client) -> Self {
PriceManager { PriceManager {
receiver, receiver,
pair, pair,
@ -69,9 +79,9 @@ impl PriceManager {
} }
} }
pub async fn handle_message(&mut self, message: SignalKind) -> Result<(), BoxError> { pub async fn handle_message(&mut self, message: ActorMessage) -> Result<(), BoxError> {
match message { match message.message {
SignalKind::Update(tick) => { Message::Update { tick } => {
let a = self.update(tick).await?; let a = self.update(tick).await?;
self.add_entry(a); self.add_entry(a);
} }
@ -93,7 +103,7 @@ impl PriceManager {
current_prices, current_prices,
self.pair.clone(), self.pair.clone(),
None, None,
None, // None,
)) ))
} }
@ -118,7 +128,7 @@ pub struct PriceEntry {
pair: SymbolPair, pair: SymbolPair,
price: PriceTicker, price: PriceTicker,
events: Option<Vec<Event>>, events: Option<Vec<Event>>,
signals: Option<Vec<SignalKind>>, // signals: Option<Vec<SignalKind>>,
} }
impl PriceEntry { impl PriceEntry {
@ -127,14 +137,14 @@ impl PriceEntry {
price: PriceTicker, price: PriceTicker,
pair: SymbolPair, pair: SymbolPair,
events: Option<Vec<Event>>, events: Option<Vec<Event>>,
signals: Option<Vec<SignalKind>>, // signals: Option<Vec<SignalKind>>,
) -> Self { ) -> Self {
PriceEntry { PriceEntry {
tick, tick,
pair, pair,
price, price,
events, events,
signals, // signals,
} }
} }
@ -150,9 +160,9 @@ impl PriceEntry {
pub fn events(&self) -> &Option<Vec<Event>> { pub fn events(&self) -> &Option<Vec<Event>> {
&self.events &self.events
} }
pub fn signals(&self) -> &Option<Vec<SignalKind>> { // pub fn signals(&self) -> &Option<Vec<SignalKind>> {
&self.signals // &self.signals
} // }
} }
/****************** /******************
@ -160,7 +170,7 @@ impl PriceEntry {
******************/ ******************/
pub struct PositionManagerHandle { pub struct PositionManagerHandle {
sender: Sender<SignalKind>, sender: Sender<ActorMessage>,
} }
impl PositionManagerHandle { impl PositionManagerHandle {
@ -180,14 +190,23 @@ impl PositionManagerHandle {
Self { sender } Self { sender }
} }
pub async fn update(&mut self, tick: u64) { pub async fn update(&mut self, tick: u64) -> Result<OptionUpdate, BoxError> {
self.sender.send(SignalKind::Update(tick)).await.unwrap(); let (send, recv) = oneshot::channel();
self.sender
.send(ActorMessage {
message: Message::Update { tick },
respond_to: send,
})
.await?;
Ok(recv.await?)
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub struct PositionManager { pub struct PositionManager {
receiver: Receiver<SignalKind>, receiver: Receiver<ActorMessage>,
current_tick: u64, current_tick: u64,
pair: SymbolPair, pair: SymbolPair,
positions_history: HashMap<u64, Position>, positions_history: HashMap<u64, Position>,
@ -198,7 +217,7 @@ pub struct PositionManager {
impl PositionManager { impl PositionManager {
pub fn new( pub fn new(
receiver: Receiver<SignalKind>, receiver: Receiver<ActorMessage>,
pair: SymbolPair, pair: SymbolPair,
client: Client, client: Client,
strategy: Box<dyn PositionStrategy>, strategy: Box<dyn PositionStrategy>,
@ -218,10 +237,12 @@ impl PositionManager {
self.current_tick self.current_tick
} }
pub async fn handle_message(&mut self, msg: SignalKind) -> Result<(), BoxError> { pub async fn handle_message(&mut self, msg: ActorMessage) -> Result<(), BoxError> {
match msg { match msg.message {
SignalKind::Update(tick) => { Message::Update { tick } => {
self.update(tick).await?; let result = self.update(tick).await?;
msg.respond_to.send(result);
} }
_ => {} _ => {}
}; };
@ -296,7 +317,7 @@ impl PositionManager {
pub type TrackedPositionsMap = HashMap<u64, ExecutedOrder>; pub type TrackedPositionsMap = HashMap<u64, ExecutedOrder>;
pub struct OrderManagerHandle { pub struct OrderManagerHandle {
sender: Sender<SignalKind>, sender: Sender<ActorMessage>,
} }
impl OrderManagerHandle { impl OrderManagerHandle {
@ -316,20 +337,35 @@ impl OrderManagerHandle {
Self { sender } Self { sender }
} }
pub async fn update(&mut self, tick: u64) { pub async fn update(&mut self, tick: u64) -> Result<OptionUpdate, BoxError> {
self.sender.send(SignalKind::Update(tick)).await.unwrap(); let (send, recv) = oneshot::channel();
self.sender
.send(ActorMessage {
message: Message::Update { tick },
respond_to: send,
})
.await?;
Ok(recv.await?)
} }
pub async fn close_position(&mut self, position: Position) { pub async fn close_position(&mut self, position: Position) -> Result<OptionUpdate, BoxError> {
let (send, recv) = oneshot::channel();
self.sender self.sender
.send(SignalKind::ClosePosition(position)) .send(ActorMessage {
.await message: Message::ClosePosition { position },
.unwrap(); respond_to: send,
})
.await?;
Ok(recv.await?)
} }
} }
pub struct OrderManager { pub struct OrderManager {
receiver: Receiver<SignalKind>, receiver: Receiver<ActorMessage>,
tracked_positions: TrackedPositionsMap, tracked_positions: TrackedPositionsMap,
pair: SymbolPair, pair: SymbolPair,
open_orders: Vec<ExecutedOrder>, open_orders: Vec<ExecutedOrder>,
@ -341,7 +377,7 @@ impl OrderManager {
const UNDERCUT_PERC: f64 = 0.005; const UNDERCUT_PERC: f64 = 0.005;
pub fn new( pub fn new(
receiver: Receiver<SignalKind>, receiver: Receiver<ActorMessage>,
pair: SymbolPair, pair: SymbolPair,
client: Client, client: Client,
strategy: Box<dyn OrderStrategy>, strategy: Box<dyn OrderStrategy>,
@ -356,12 +392,12 @@ impl OrderManager {
} }
} }
pub async fn handle_message(&mut self, msg: SignalKind) -> Result<(), BoxError> { pub async fn handle_message(&mut self, msg: ActorMessage) -> Result<(), BoxError> {
match msg { match msg.message {
SignalKind::Update(_) => { Message::Update { .. } => {
self.update(); self.update();
} }
SignalKind::ClosePosition(position) => self.close_position(&position).await?, Message::ClosePosition { position, .. } => self.close_position(&position).await?,
_ => {} _ => {}
}; };
@ -438,7 +474,7 @@ pub struct PairManager {
price_manager: PriceManagerHandle, price_manager: PriceManagerHandle,
order_manager: OrderManagerHandle, order_manager: OrderManagerHandle,
position_manager: PositionManagerHandle, position_manager: PositionManagerHandle,
dispatcher: Dispatcher, // dispatcher: Dispatcher,
} }
impl PairManager { impl PairManager {
@ -456,7 +492,7 @@ impl PairManager {
client.clone(), client.clone(),
Box::new(TrailingStop::new()), Box::new(TrailingStop::new()),
), ),
dispatcher: Dispatcher::new(), // dispatcher: Dispatcher::new(),
} }
} }
} }

View File

@ -3,9 +3,10 @@ use std::fmt::{Debug, Formatter};
use dyn_clone::DynClone; use dyn_clone::DynClone;
use crate::events::{Event, EventKind, EventMetadata, SignalKind}; use crate::events::{Event, EventKind, EventMetadata, Message};
use crate::managers::{OrderManager, PositionManager, TrackedPositionsMap}; use crate::managers::{OrderManager, PositionManager, TrackedPositionsMap};
use crate::models::{ExecutedOrder, OrderForm, Position, PositionProfitState}; use crate::models::{ExecutedOrder, OrderForm, Position, PositionProfitState};
use tokio::sync::oneshot;
/*************** /***************
* DEFINITIONS * DEFINITIONS
@ -17,7 +18,7 @@ pub trait PositionStrategy: DynClone + Send {
&self, &self,
position: Position, position: Position,
manager: &PositionManager, manager: &PositionManager,
) -> (Position, Option<Vec<Event>>, Option<Vec<SignalKind>>); ) -> (Position, Option<Vec<Event>>, Option<Vec<Message>>);
} }
impl Debug for dyn PositionStrategy { impl Debug for dyn PositionStrategy {
@ -84,7 +85,7 @@ impl PositionStrategy for TrailingStop {
&self, &self,
position: Position, position: Position,
manager: &PositionManager, manager: &PositionManager,
) -> (Position, Option<Vec<Event>>, Option<Vec<SignalKind>>) { ) -> (Position, Option<Vec<Event>>, Option<Vec<Message>>) {
let mut signals = vec![]; let mut signals = vec![];
let events = vec![]; let events = vec![];
@ -102,7 +103,9 @@ impl PositionStrategy for TrailingStop {
} else if TrailingStop::MAX_LOSS_PERC < pl_perc && pl_perc < 0.0 { } else if TrailingStop::MAX_LOSS_PERC < pl_perc && pl_perc < 0.0 {
PositionProfitState::Loss PositionProfitState::Loss
} else { } else {
signals.push(SignalKind::ClosePosition(position.clone())); signals.push(Message::ClosePosition {
position: position.clone(),
});
PositionProfitState::Critical PositionProfitState::Critical
} }
}; };