rust #10

Merged
peperunas merged 127 commits from rust into master 2021-02-18 09:42:16 +00:00
3 changed files with 117 additions and 83 deletions
Showing only changes of commit befa1d4bec - Show all commits

View File

@ -1,4 +1,5 @@
use core::time::Duration;
use std::collections::HashMap;
use log::{debug, error, info};
use tokio::time::delay_for;
@ -6,7 +7,7 @@ use tokio::time::delay_for;
use crate::connectors::{Client, ExchangeKind};
use crate::currency::{Symbol, SymbolPair};
use crate::events::Event;
use crate::managers::{OrderManager, PositionManager, PriceManager};
use crate::managers::{ExchangeManager, OrderManager, PositionManager, PriceManager};
use crate::strategy::PositionStrategy;
use crate::ticker::Ticker;
use crate::BoxError;
@ -15,9 +16,7 @@ pub struct BfxBot {
ticker: Ticker,
quote: Symbol,
trading_symbols: Vec<Symbol>,
price_managers: Vec<PriceManager>,
order_managers: Vec<OrderManager>,
pos_managers: Vec<PositionManager>,
exchange_managers: Vec<ExchangeManager>,
}
impl BfxBot {
@ -27,48 +26,36 @@ impl BfxBot {
quote: Symbol,
tick_duration: Duration,
) -> Self {
let clients: Vec<_> = exchanges.iter().map(|x| Client::new(x)).collect();
let pairs: Vec<_> = trading_symbols
.iter()
.map(|x| SymbolPair::new(quote.clone(), x.clone()))
.collect();
let mut pos_managers = Vec::new();
let mut order_managers = Vec::new();
let mut price_managers = Vec::new();
for c in clients {
for p in &pairs {
pos_managers.push(PositionManager::new(p.clone(), c.clone()));
order_managers.push(OrderManager::new(p.clone(), c.clone()));
price_managers.push(PriceManager::new(p.clone(), c.clone()));
}
}
let exchange_managers = exchanges
.iter()
.map(|x| ExchangeManager::new(x, &pairs))
.collect();
BfxBot {
ticker: Ticker::new(tick_duration),
price_managers,
quote,
trading_symbols,
order_managers,
pos_managers,
exchange_managers,
}
}
pub fn with_position_strategy(mut self, strategy: Box<dyn PositionStrategy>) -> Self {
self.pos_managers = self
.pos_managers
self.exchange_managers = self
.exchange_managers
.into_iter()
.map(|x| x.with_strategy(dyn_clone::clone_box(&*strategy)))
.map(|x| x.with_position_strategy(dyn_clone::clone_box(&*strategy)))
.collect();
self
}
pub async fn start_loop(&mut self) -> Result<(), BoxError> {
if let Err(e) = self.update_managers().await {
error!("Error while starting managers: {}", e);
}
self.update_exchanges().await?;
loop {
info!("Current tick: {}", self.ticker.current_tick());
@ -79,65 +66,22 @@ impl BfxBot {
}
}
async fn update_exchanges(&mut self) -> Result<(), BoxError> {
for e in &mut self.exchange_managers {
if let Err(err) = e.update_managers(self.ticker.current_tick()).await {
error!("Error while updating managers: {}", err);
}
}
Ok(())
}
async fn update(&mut self) -> Result<(), BoxError> {
delay_for(self.ticker.duration()).await;
self.ticker.inc();
if let Err(e) = self.update_managers().await {
error!("Error while updating managers: {}", e);
}
self.update_exchanges().await?;
Ok(())
}
async fn update_managers(&mut self) -> Result<(), BoxError> {
self.update_price_managers().await?;
self.update_position_managers().await?;
Ok(())
}
async fn update_position_managers(&mut self) -> Result<Option<Vec<Event>>, BoxError> {
for mgr in &mut self.pos_managers {
let tick = self.ticker.current_tick();
mgr.update(tick).await?;
}
Ok(None)
}
async fn update_price_managers(&mut self) -> Result<Option<Vec<Event>>, BoxError> {
let futures: Vec<_> = self
.price_managers
.clone()
.into_iter()
// the only reason you need the async block is that the future
// returned by x.update(tick) borrows from x
// so we create a future that first takes ownership of x, then uses it to call x.update
.map(|mut x| {
let tick = self.ticker.current_tick();
async move { x.update(tick).await }
})
.map(tokio::spawn)
.collect();
let mut price_entries = vec![];
for f in futures {
price_entries.push(f.await??);
}
for manager in &mut self.price_managers {
let prices: Vec<_> = price_entries
.drain_filter(|x| x.pair() == manager.pair())
.collect();
for p in prices {
manager.add_entry(p);
}
}
Ok(None)
}
}

View File

@ -26,12 +26,10 @@ async fn main() -> Result<(), BoxError> {
let test_api_key = "P1EVE68DJByDAkGQvpIkTwfrbYXd2Vo2ZaIhTYb9vx2";
let test_api_secret = "1nicg8z0zKVEt5Rb7ZDpIYjVYVTgvCaCPMZqB0niFli";
let affiliate_code = "XPebOgHxA";
let bitfinex = ExchangeKind::Bitfinex {
api_key: test_api_key.into(),
api_secret: test_api_secret.into(),
affiliate_code: Some(affiliate_code.into()),
};
let mut bot = BfxBot::new(
@ -65,7 +63,7 @@ fn setup_logger() -> Result<(), fern::InitError> {
.level(Debug)
.filter(|metadata| metadata.target().contains("rustybot"))
.chain(std::io::stdout())
.chain(fern::log_file("rustico.log")?)
// .chain(fern::log_file("rustico.log")?)
.apply()?;
Ok(())

View File

@ -1,6 +1,6 @@
use std::collections::HashMap;
use crate::connectors::Client;
use crate::connectors::{Client, ExchangeKind};
use crate::currency::SymbolPair;
use crate::events::{Event, SignalKind};
use crate::models::{Order, Position, PriceTicker};
@ -245,3 +245,95 @@ impl OrderManager {
unimplemented!()
}
}
pub struct ExchangeManager {
kind: ExchangeKind,
price_managers: Vec<PriceManager>,
order_managers: Vec<OrderManager>,
position_managers: Vec<PositionManager>,
client: Client,
}
impl ExchangeManager {
pub fn new(kind: &ExchangeKind, pairs: &Vec<SymbolPair>) -> Self {
let client = Client::new(kind);
let mut position_managers = Vec::new();
let mut order_managers = Vec::new();
let mut price_managers = Vec::new();
for p in pairs {
position_managers.push(PositionManager::new(p.clone(), client.clone()));
order_managers.push(OrderManager::new(p.clone(), client.clone()));
price_managers.push(PriceManager::new(p.clone(), client.clone()));
}
ExchangeManager {
kind: kind.clone(),
position_managers,
order_managers,
price_managers,
client,
}
}
pub fn with_position_strategy(mut self, strategy: Box<dyn PositionStrategy>) -> Self {
self.position_managers = self
.position_managers
.into_iter()
.map(|x| x.with_strategy(dyn_clone::clone_box(&*strategy)))
.collect();
self
}
pub async fn update_managers(&mut self, tick: u64) -> Result<(), BoxError> {
self.update_price_managers(tick).await?;
self.update_position_managers(tick).await?;
Ok(())
}
async fn update_position_managers(
&mut self,
tick: u64,
) -> Result<Option<Vec<Event>>, BoxError> {
for mgr in &mut self.position_managers {
println!("Manager: {:?}", mgr);
mgr.update(tick).await?;
}
Ok(None)
}
async fn update_price_managers(&mut self, tick: u64) -> Result<Option<Vec<Event>>, BoxError> {
let futures: Vec<_> = self
.price_managers
.clone()
.into_iter()
// the only reason you need the async block is that the future
// returned by x.update(tick) borrows from x
// so we create a future that first takes ownership of x, then uses it to call x.update
.map(|mut x| async move { x.update(tick).await })
.map(tokio::spawn)
.collect();
let mut price_entries = vec![];
for f in futures {
price_entries.push(f.await??);
}
for manager in &mut self.price_managers {
let prices: Vec<_> = price_entries
.drain_filter(|x| x.pair() == manager.pair())
.collect();
for p in prices {
manager.add_entry(p);
}
}
Ok(None)
}
}