From 307bbb1b0c5ad7aca4c662741011053208579f51 Mon Sep 17 00:00:00 2001 From: Giulio De Pasquale Date: Tue, 26 Jan 2021 11:15:04 +0000 Subject: [PATCH] retry signed requests --- rustybot/Cargo.lock | 103 ++++++++++++++++++++++++++++++++----- rustybot/Cargo.toml | 3 +- rustybot/src/connectors.rs | 92 +++++++++++++++++++++++++++------ rustybot/src/main.rs | 30 +++++++---- rustybot/src/managers.rs | 35 +++++-------- 5 files changed, 202 insertions(+), 61 deletions(-) diff --git a/rustybot/Cargo.lock b/rustybot/Cargo.lock index c3f0e6a..2d62993 100644 --- a/rustybot/Cargo.lock +++ b/rustybot/Cargo.lock @@ -371,46 +371,108 @@ dependencies = [ ] [[package]] -name = "futures-channel" -version = "0.3.8" +name = "futures" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64" +checksum = "da9052a1a50244d8d5aa9bf55cbc2fb6f357c86cc52e46c62ed390a7180cf150" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2d31b7ec7efab6eefc7c57233bb10b847986139d88cc2f5a02a1ae6871a1846" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748" +checksum = "79e5145dde8da7d1b3892dad07a9c98fc04bc39892b1ecc9692cf53e2b780a65" + +[[package]] +name = "futures-executor" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9e59fdc009a4b3096bf94f740a0f2424c082521f20a9b08c5c07c48d90fd9b9" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500" + +[[package]] +name = "futures-macro" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-retry" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde5a672a61f96552aa5ed9fd9c81c3fbdae4be9b1e205d6eaf17c83705adc0f" +dependencies = [ + "futures", + "pin-project-lite", + "tokio 1.1.0", +] [[package]] name = "futures-sink" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d" +checksum = "caf5c69029bda2e743fddd0582d1083951d65cc9539aebf8812f36c3491342d6" [[package]] name = "futures-task" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d" +checksum = "13de07eb8ea81ae445aca7b69f5f7bf15d7bf4912d8ca37d6645c77ae8a58d86" dependencies = [ "once_cell", ] [[package]] name = "futures-util" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2" +checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", - "pin-project 1.0.2", + "memchr", + "pin-project-lite", "pin-utils", + "proc-macro-hack", + "proc-macro-nested", "slab", ] @@ -959,9 +1021,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.0" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c" +checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827" [[package]] name = "pin-utils" @@ -1005,6 +1067,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" + [[package]] name = "proc-macro2" version = "1.0.24" @@ -1203,6 +1277,7 @@ dependencies = [ "dyn-clone", "fern", "float-cmp", + "futures-retry", "futures-util", "log 0.4.11", "merge", diff --git a/rustybot/Cargo.toml b/rustybot/Cargo.toml index 7a60bee..b22004a 100644 --- a/rustybot/Cargo.toml +++ b/rustybot/Cargo.toml @@ -19,4 +19,5 @@ fern = {version = "0.6", features = ["colored"]} chrono = "0.4" byteorder = "1" float-cmp = "0.8" -merge = "0.1" \ No newline at end of file +merge = "0.1" +futures-retry = "0.6" \ No newline at end of file diff --git a/rustybot/src/connectors.rs b/rustybot/src/connectors.rs index 2a74e83..626a995 100644 --- a/rustybot/src/connectors.rs +++ b/rustybot/src/connectors.rs @@ -7,6 +7,10 @@ use async_trait::async_trait; use bitfinex::api::Bitfinex; use bitfinex::orders::{CancelOrderForm, OrderMeta, OrderResponse}; use bitfinex::ticker::TradingPairTicker; +use futures_retry::{FutureRetry, RetryPolicy, StreamRetryExt}; +use futures_util::task::FutureObj; +use log::trace; +use tokio::time::Duration; use crate::currency::SymbolPair; use crate::models::{ @@ -14,6 +18,8 @@ use crate::models::{ PriceTicker, TradingPlatform, }; use crate::BoxError; +use std::error::Error; +use tokio::macros::support::Future; #[derive(PartialEq, Eq, Clone, Copy, Debug)] pub enum Exchange { @@ -142,6 +148,13 @@ pub struct BitfinexConnector { impl BitfinexConnector { const AFFILIATE_CODE: &'static str = "XPebOgHxA"; + fn handle_small_nonce_error(e: BoxError) -> RetryPolicy { + if e.to_string().contains("nonce: small") { + return RetryPolicy::WaitRetry(Duration::from_millis(1)); + } + return RetryPolicy::ForwardError(e); + } + pub fn new(api_key: &str, api_secret: &str) -> Self { BitfinexConnector { bfx: Bitfinex::new(Some(api_key.into()), Some(api_secret.into())), @@ -164,7 +177,12 @@ impl Connector for BitfinexConnector { } async fn active_positions(&self, pair: &SymbolPair) -> Result>, BoxError> { - let active_positions = self.bfx.positions.active_positions().await?; + let (active_positions, _) = FutureRetry::new( + move || self.bfx.positions.active_positions(), + BitfinexConnector::handle_small_nonce_error, + ) + .await + .map_err(|(e, attempts)| e)?; let positions: Vec<_> = active_positions .into_iter() @@ -172,11 +190,8 @@ impl Connector for BitfinexConnector { .filter(|x: &Position| x.pair() == pair) .collect(); - if positions.is_empty() { - Ok(None) - } else { - Ok(Some(positions)) - } + trace!("\t[PositionManager] Retrieved positions for {}", pair); + Ok((!positions.is_empty()).then_some(positions)) } async fn current_prices(&self, pair: &SymbolPair) -> Result { @@ -188,7 +203,12 @@ impl Connector for BitfinexConnector { } async fn active_orders(&self, _: &SymbolPair) -> Result, BoxError> { - let response = self.bfx.orders.active_orders().await?; + let (response, _) = FutureRetry::new( + move || self.bfx.orders.active_orders(), + BitfinexConnector::handle_small_nonce_error, + ) + .await + .map_err(|(e, attempts)| e)?; Ok(response.iter().map(Into::into).collect()) } @@ -227,7 +247,22 @@ impl Connector for BitfinexConnector { BitfinexConnector::AFFILIATE_CODE.to_string(), )); - let response = self.bfx.orders.submit_order(&order_form).await?; + // retry to submit the order until it succeeds. + // the function may fail due to concurrent signed requests + // parsed in different times by the server + let response = { + loop { + match self.bfx.orders.submit_order(&order_form).await { + Ok(response) => break response, + Err(e) => { + if !e.to_string().contains("nonce: small") { + return Err(e); + } + tokio::time::sleep(Duration::from_nanos(1)).await; + } + } + } + }; Ok((&response).try_into()?) } @@ -235,13 +270,26 @@ impl Connector for BitfinexConnector { async fn order_book(&self, pair: &SymbolPair) -> Result { let symbol_name = BitfinexConnector::format_trading_pair(pair); - let x = self - .bfx - .book - .trading_pair(symbol_name, bitfinex::book::BookPrecision::P0) - .await?; + let response = { + loop { + match self + .bfx + .book + .trading_pair(symbol_name.clone(), bitfinex::book::BookPrecision::P0) + .await + { + Ok(response) => break response, + Err(e) => { + if !e.to_string().contains("nonce: small") { + return Err(e); + } + tokio::time::sleep(Duration::from_nanos(1)).await; + } + } + } + }; - let entries = x + let entries = response .into_iter() .map(|x| OrderBookEntry::Trading { price: x.price, @@ -256,7 +304,21 @@ impl Connector for BitfinexConnector { async fn cancel_order(&self, order: &ActiveOrder) -> Result { let cancel_form = order.into(); - Ok((&self.bfx.orders.cancel_order(&cancel_form).await?).try_into()?) + let response = { + loop { + match self.bfx.orders.cancel_order(&cancel_form).await { + Ok(response) => break response, + Err(e) => { + if !e.to_string().contains("nonce: small") { + return Err(e); + } + tokio::time::sleep(Duration::from_nanos(1)).await; + } + } + } + }; + + Ok((&response).try_into()?) } } diff --git a/rustybot/src/main.rs b/rustybot/src/main.rs index 1dde9fc..8a0d3e5 100644 --- a/rustybot/src/main.rs +++ b/rustybot/src/main.rs @@ -2,13 +2,12 @@ #![feature(bool_to_option)] use fern::colors::{Color, ColoredLevelConfig}; -use log::LevelFilter::Debug; +use log::LevelFilter::{Debug, Trace}; use tokio::time::Duration; use crate::bot::BfxBot; -use crate::connectors::ExchangeKind; +use crate::connectors::ExchangeDetails; use crate::currency::Symbol; -use crate::strategy::TrailingStop; mod bot; mod connectors; @@ -25,18 +24,31 @@ pub type BoxError = Box; async fn main() -> Result<(), BoxError> { setup_logger()?; + // TEST let test_api_key = "P1EVE68DJByDAkGQvpIkTwfrbYXd2Vo2ZaIhTYb9vx2"; let test_api_secret = "1nicg8z0zKVEt5Rb7ZDpIYjVYVTgvCaCPMZqB0niFli"; - let bitfinex = ExchangeKind::Bitfinex { - api_key: test_api_key.into(), - api_secret: test_api_secret.into(), + // REAL + // let orders_api_key = "hc5nDvYbFYJZMKdnzYq8P4AzCSwjxfQHnMyrg69Sf4c"; + // let orders_api_secret = "53x9goIOpbOtBoPi7dmigK5Cq5e0282EUO2qRIMEXlh"; + // let prices_api_key = "gTfFZUCwRBE0Z9FZjyk9HNe4lZ7XuiZY9rrW71SyUr9"; + // let prices_api_secret = "zWbxvoFZad3BPIiXK4DKfEvC0YsAuaApbeAyI8OBXgN"; + // let positions_api_key = "PfR7BadPZPNdVZnkHFBfAjsg7gjt8pAecMj5B8eRPFi"; + // let positions_api_secret = "izzvxtE3XsBBRpVCHGJ8f60UA56SmPNbBvJGVd67aqD"; + + let bitfinex = ExchangeDetails::Bitfinex { + prices_api_key: test_api_key.into(), + prices_api_secret: test_api_secret.into(), + orders_api_key: test_api_key.into(), + orders_api_secret: test_api_secret.into(), + positions_api_key: test_api_key.into(), + positions_api_secret: test_api_secret.into(), }; let mut bot = BfxBot::new( vec![bitfinex], - vec![Symbol::TESTBTC], - Symbol::TESTUSD, + vec![Symbol::BTC, Symbol::ETH, Symbol::XMR], + Symbol::USD, Duration::new(1, 0), ); @@ -60,7 +72,7 @@ fn setup_logger() -> Result<(), fern::InitError> { message )) }) - .level(Debug) + .level(Trace) .filter(|metadata| metadata.target().contains("rustybot")) .chain(std::io::stdout()) // .chain(fern::log_file("rustico.log")?) diff --git a/rustybot/src/managers.rs b/rustybot/src/managers.rs index 617f6fd..b0e02c8 100644 --- a/rustybot/src/managers.rs +++ b/rustybot/src/managers.rs @@ -184,7 +184,6 @@ impl PositionManagerHandle { .await?; let response = recv.await?; - println!("Got response: {:?}", response); Ok(response) } } @@ -228,8 +227,6 @@ impl PositionManager { _ => (None, None), }; - println!("Responding with {:?}", messages); - Ok(msg .respond_to .send((events, messages)) @@ -237,7 +234,7 @@ impl PositionManager { } pub async fn update(&mut self, tick: u64) -> Result { - // debug!("Updating {}", self.pair); + trace!("\t[PositionManager] Updating {}", self.pair); let opt_active_positions = self.client.active_positions(&self.pair).await?; self.current_tick = tick; @@ -283,7 +280,6 @@ impl PositionManager { .insert(self.current_tick(), pos_post_tick.clone()); self.active_position = Some(pos_post_tick); - println!("Returning: {:?}", messages); return Ok((events, messages)); } } @@ -408,15 +404,11 @@ impl OrderManager { info!("Closing position #{}", position_id); debug!("Retrieving open orders, positions and current prices..."); - // let (open_orders, order_book, open_positions) = tokio::join!( - // self.client.active_orders(&self.pair), - // self.client.order_book(&self.pair), - // self.client.active_positions(&self.pair) - // ); - - let open_orders = self.client.active_orders(&self.pair).await; - let order_book = self.client.order_book(&self.pair).await; - let open_positions = self.client.active_positions(&self.pair).await; + let (open_orders, order_book, open_positions) = tokio::join!( + self.client.active_orders(&self.pair), + self.client.order_book(&self.pair), + self.client.active_positions(&self.pair) + ); let open_orders = match open_orders { Ok(open_orders) => open_orders, @@ -518,13 +510,13 @@ impl OrderManager { // TODO: finish me pub async fn update(&self) -> Result { - trace!("\tUpdating {} order manager.", self.pair); - // - // let (open_orders, opt_open_positions) = tokio::join!( - // self.client.active_orders(&self.pair), - // self.client.active_positions(&self.pair) - // ); - // let (_open_orders, _opt_open_positions) = (open_orders?, opt_open_positions?); + trace!("\t[OrderManager] Updating {}", self.pair); + + let (open_orders, opt_open_positions) = tokio::join!( + self.client.active_orders(&self.pair), + self.client.active_positions(&self.pair) + ); + let (_open_orders, _opt_open_positions) = (open_orders?, opt_open_positions?); Ok((None, None)) } @@ -606,7 +598,6 @@ impl PairManager { messages.merge(opt_pos_messages); messages.merge(opt_order_messages); - println!("Messages: {:?}", messages); // TODO: to move into Handler? if let Some(messages) = messages { for m in messages {