core/rustybot/src/managers.rs

378 lines
10 KiB
Rust
Raw Normal View History

2021-01-14 18:56:31 +00:00
use std::collections::HashMap;
2021-01-16 11:43:16 +00:00
use std::ops::Neg;
use bitfinex::ticker::TradingPairTicker;
use log::error;
2021-01-14 12:42:23 +00:00
2021-01-15 10:40:48 +00:00
use crate::connectors::{Client, ExchangeKind};
use crate::currency::SymbolPair;
2021-01-16 11:43:16 +00:00
use crate::events::{Dispatcher, Event, SignalKind};
use crate::models::{ExecutedOrder, OrderForm, OrderKind, Position, PriceTicker};
use crate::strategy::{FastOrderStrategy, OrderStrategy, PositionStrategy};
use crate::BoxError;
2021-01-13 09:24:59 +00:00
2021-01-14 12:42:23 +00:00
pub struct EventManager {
2021-01-13 09:26:29 +00:00
events: Vec<Event>,
}
2021-01-13 09:03:24 +00:00
#[derive(Clone, Debug)]
pub struct PriceManager {
pair: SymbolPair,
prices: Vec<PriceEntry>,
client: Client,
}
impl PriceManager {
pub fn new(pair: SymbolPair, client: Client) -> Self {
PriceManager {
pair,
prices: Vec::new(),
client,
}
}
pub fn add_entry(&mut self, entry: PriceEntry) {
self.prices.push(entry);
}
pub async fn update(&mut self, tick: u64) -> Result<PriceEntry, BoxError> {
let current_prices = self.client.current_prices(&self.pair).await?.into();
Ok(PriceEntry::new(
tick,
current_prices,
self.pair.clone(),
None,
None,
))
}
// fn add_event(&mut self, event: Event) {
// self.events.push(event);
//
// self.dispatcher.call_event_handlers(&event, &self);
// }
//
// fn add_signal(&mut self, signal: SignalKind) {
// self.signals.insert(self.current_tick(), signal);
// }
pub fn pair(&self) -> &SymbolPair {
&self.pair
}
}
#[derive(Clone, Debug)]
pub struct PriceEntry {
tick: u64,
pair: SymbolPair,
price: PriceTicker,
events: Option<Vec<Event>>,
signals: Option<Vec<SignalKind>>,
}
impl PriceEntry {
pub fn new(
tick: u64,
price: PriceTicker,
pair: SymbolPair,
events: Option<Vec<Event>>,
signals: Option<Vec<SignalKind>>,
) -> Self {
PriceEntry {
tick,
pair,
price,
events,
signals,
}
}
pub fn tick(&self) -> u64 {
self.tick
}
pub fn pair(&self) -> &SymbolPair {
&self.pair
}
pub fn price(&self) -> PriceTicker {
self.price
}
pub fn events(&self) -> &Option<Vec<Event>> {
&self.events
}
pub fn signals(&self) -> &Option<Vec<SignalKind>> {
&self.signals
}
}
2021-01-14 18:36:56 +00:00
#[derive(Debug)]
2021-01-14 12:42:23 +00:00
pub struct PositionManager {
2021-01-14 19:20:58 +00:00
current_tick: u64,
2021-01-14 18:36:56 +00:00
pair: SymbolPair,
positions_history: HashMap<u64, Position>,
active_position: Option<Position>,
2021-01-13 09:24:59 +00:00
client: Client,
2021-01-14 12:42:23 +00:00
strategy: Option<Box<dyn PositionStrategy>>,
}
impl PositionManager {
2021-01-14 18:36:56 +00:00
pub fn new(pair: SymbolPair, client: Client) -> Self {
2021-01-14 12:42:23 +00:00
PositionManager {
2021-01-14 19:20:58 +00:00
current_tick: 0,
2021-01-14 18:36:56 +00:00
pair,
positions_history: HashMap::new(),
active_position: None,
2021-01-14 12:42:23 +00:00
client,
strategy: None,
}
}
pub fn with_strategy(mut self, strategy: Box<dyn PositionStrategy>) -> Self {
self.strategy = Some(strategy);
self
}
2021-01-14 19:20:58 +00:00
pub fn current_tick(&self) -> u64 {
self.current_tick
}
2021-01-15 11:10:00 +00:00
pub async fn update(
&mut self,
tick: u64,
) -> Result<(Option<Vec<Event>>, Option<OrderForm>), BoxError> {
2021-01-14 18:56:31 +00:00
let opt_active_positions = self.client.active_positions(&self.pair).await?;
2021-01-14 18:36:56 +00:00
let mut events = vec![];
2021-01-14 19:20:58 +00:00
self.current_tick = tick;
2021-01-14 18:36:56 +00:00
// we assume there is only ONE active position per pair
2021-01-15 11:10:00 +00:00
match opt_active_positions {
// no open positions, no events and no order forms returned
None => return Ok((None, None)),
Some(positions) => {
// checking if there are positions open for our pair
match positions
.into_iter()
.filter(|x| x.pair() == &self.pair)
.next()
{
// no open positions for our pair, setting active position to none
None => self.active_position = None,
// applying strategy to open position and saving into struct
Some(position) => {
let position_after_strategy = {
match &self.strategy {
Some(strategy) => {
let (pos, strategy_events, _) =
2021-01-16 11:43:16 +00:00
strategy.on_new_tick(position, &self);
2021-01-15 11:10:00 +00:00
events.extend(strategy_events);
pos
}
None => position,
}
};
self.positions_history
.insert(self.current_tick(), position_after_strategy.clone());
self.active_position = Some(position_after_strategy);
2021-01-14 18:36:56 +00:00
}
2021-01-15 11:10:00 +00:00
}
2021-01-14 18:36:56 +00:00
}
2021-01-15 11:10:00 +00:00
};
2021-01-14 18:36:56 +00:00
2021-01-15 11:10:00 +00:00
Ok(((events.is_empty().then_some(events)), None))
2021-01-14 12:42:23 +00:00
}
2021-01-14 19:20:58 +00:00
pub fn position_previous_tick(&self, id: u64, tick: Option<u64>) -> Option<&Position> {
let tick = match tick {
Some(tick) => {
if tick < 1 {
1
} else {
tick
}
}
None => self.current_tick() - 1,
};
self.positions_history
.get(&tick)
2021-01-16 11:43:16 +00:00
.filter(|x| x.id() == id)
2021-01-14 19:20:58 +00:00
.and_then(|x| Some(x))
}
2021-01-13 09:24:59 +00:00
}
2021-01-13 09:03:24 +00:00
2021-01-14 12:42:23 +00:00
pub struct OrderManager {
2021-01-16 11:43:16 +00:00
tracked_positions: HashMap<u64, ExecutedOrder>,
2021-01-14 18:36:56 +00:00
pair: SymbolPair,
2021-01-16 11:43:16 +00:00
open_orders: Vec<ExecutedOrder>,
2021-01-13 09:24:59 +00:00
client: Client,
2021-01-16 11:43:16 +00:00
strategy: Box<dyn OrderStrategy>,
2021-01-13 09:24:59 +00:00
}
2021-01-14 12:42:23 +00:00
impl OrderManager {
2021-01-16 11:43:16 +00:00
const UNDERCUT_PERC: f64 = 0.005;
pub fn new(pair: SymbolPair, client: Client, strategy: Box<dyn OrderStrategy>) -> Self {
2021-01-14 12:42:23 +00:00
OrderManager {
2021-01-14 18:36:56 +00:00
pair,
2021-01-14 12:42:23 +00:00
open_orders: Vec::new(),
client,
2021-01-16 11:43:16 +00:00
strategy,
tracked_positions: HashMap::new(),
2021-01-14 12:42:23 +00:00
}
}
2021-01-16 11:43:16 +00:00
pub async fn close_position(&mut self, position: &Position) -> Result<(), BoxError> {
// checking if the position has an open order.
// If so, the strategy method is called, otherwise we open
// an undercut limit order at the best current price.
match self.tracked_positions.get(&position.id()) {
Some(open_order) => self.strategy.on_position_close(open_order, &self),
None => {
let current_prices = self.client.current_prices(&self.pair).await?;
let closing_price = self.best_closing_price(&position, &current_prices)?;
// submitting order
let order_form = OrderForm::new(
&self.pair,
closing_price,
position.amount().neg(),
OrderKind::Limit,
);
match self.client.submit_order(order_form).await {
Err(e) => error!("Could not submit order: {}", e),
Ok(o) => {
self.tracked_positions.insert(position.id(), o);
}
};
}
}
Ok(())
}
2021-01-14 12:42:23 +00:00
pub fn update(&self) -> Option<Vec<Event>> {
unimplemented!()
}
2021-01-16 11:43:16 +00:00
pub fn best_closing_price(
&self,
position: &Position,
price_ticker: &TradingPairTicker,
) -> Result<f64, BoxError> {
let price = {
if position.is_short() {
price_ticker.ask
} else {
price_ticker.bid
}
};
Ok(price * (1.0 - OrderManager::UNDERCUT_PERC))
}
2021-01-14 12:42:23 +00:00
}
2021-01-15 10:40:48 +00:00
pub struct ExchangeManager {
kind: ExchangeKind,
price_managers: Vec<PriceManager>,
order_managers: Vec<OrderManager>,
position_managers: Vec<PositionManager>,
2021-01-16 11:43:16 +00:00
dispatcher: Dispatcher,
2021-01-15 10:40:48 +00:00
client: Client,
}
impl ExchangeManager {
pub fn new(kind: &ExchangeKind, pairs: &Vec<SymbolPair>) -> Self {
let client = Client::new(kind);
let mut position_managers = Vec::new();
let mut order_managers = Vec::new();
let mut price_managers = Vec::new();
for p in pairs {
position_managers.push(PositionManager::new(p.clone(), client.clone()));
2021-01-16 11:43:16 +00:00
order_managers.push(OrderManager::new(
p.clone(),
client.clone(),
Box::new(FastOrderStrategy {}),
));
2021-01-15 10:40:48 +00:00
price_managers.push(PriceManager::new(p.clone(), client.clone()));
}
ExchangeManager {
kind: kind.clone(),
position_managers,
order_managers,
price_managers,
client,
2021-01-16 11:43:16 +00:00
dispatcher: Dispatcher::new(),
2021-01-15 10:40:48 +00:00
}
}
pub fn with_position_strategy(mut self, strategy: Box<dyn PositionStrategy>) -> Self {
self.position_managers = self
.position_managers
.into_iter()
.map(|x| x.with_strategy(dyn_clone::clone_box(&*strategy)))
.collect();
self
}
pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> {
self.update_price_managers(tick).await?;
self.update_position_managers(tick).await?;
Ok(())
}
async fn update_position_managers(
&mut self,
tick: u64,
) -> Result<Option<Vec<Event>>, BoxError> {
for mgr in &mut self.position_managers {
println!("Manager: {:?}", mgr);
mgr.update(tick).await?;
}
Ok(None)
}
async fn update_price_managers(&mut self, tick: u64) -> Result<Option<Vec<Event>>, BoxError> {
let futures: Vec<_> = self
.price_managers
.clone()
.into_iter()
// the only reason you need the async block is that the future
// returned by x.update(tick) borrows from x
// so we create a future that first takes ownership of x, then uses it to call x.update
.map(|mut x| async move { x.update(tick).await })
.map(tokio::spawn)
.collect();
let mut price_entries = vec![];
for f in futures {
price_entries.push(f.await??);
}
for manager in &mut self.price_managers {
let prices: Vec<_> = price_entries
.drain_filter(|x| x.pair() == manager.pair())
.collect();
for p in prices {
manager.add_entry(p);
}
}
Ok(None)
}
}