retry signed requests

This commit is contained in:
Giulio De Pasquale 2021-01-26 11:15:04 +00:00
parent f66d7ef142
commit 307bbb1b0c
5 changed files with 202 additions and 61 deletions

103
rustybot/Cargo.lock generated
View File

@ -371,46 +371,108 @@ dependencies = [
]
[[package]]
name = "futures-channel"
version = "0.3.8"
name = "futures"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64"
checksum = "da9052a1a50244d8d5aa9bf55cbc2fb6f357c86cc52e46c62ed390a7180cf150"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2d31b7ec7efab6eefc7c57233bb10b847986139d88cc2f5a02a1ae6871a1846"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.8"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748"
checksum = "79e5145dde8da7d1b3892dad07a9c98fc04bc39892b1ecc9692cf53e2b780a65"
[[package]]
name = "futures-executor"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9e59fdc009a4b3096bf94f740a0f2424c082521f20a9b08c5c07c48d90fd9b9"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500"
[[package]]
name = "futures-macro"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-retry"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fde5a672a61f96552aa5ed9fd9c81c3fbdae4be9b1e205d6eaf17c83705adc0f"
dependencies = [
"futures",
"pin-project-lite",
"tokio 1.1.0",
]
[[package]]
name = "futures-sink"
version = "0.3.8"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d"
checksum = "caf5c69029bda2e743fddd0582d1083951d65cc9539aebf8812f36c3491342d6"
[[package]]
name = "futures-task"
version = "0.3.8"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d"
checksum = "13de07eb8ea81ae445aca7b69f5f7bf15d7bf4912d8ca37d6645c77ae8a58d86"
dependencies = [
"once_cell",
]
[[package]]
name = "futures-util"
version = "0.3.8"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2"
checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"pin-project 1.0.2",
"memchr",
"pin-project-lite",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
@ -959,9 +1021,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.0"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c"
checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827"
[[package]]
name = "pin-utils"
@ -1005,6 +1067,18 @@ dependencies = [
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]]
name = "proc-macro2"
version = "1.0.24"
@ -1203,6 +1277,7 @@ dependencies = [
"dyn-clone",
"fern",
"float-cmp",
"futures-retry",
"futures-util",
"log 0.4.11",
"merge",

View File

@ -20,3 +20,4 @@ chrono = "0.4"
byteorder = "1"
float-cmp = "0.8"
merge = "0.1"
futures-retry = "0.6"

View File

@ -7,6 +7,10 @@ use async_trait::async_trait;
use bitfinex::api::Bitfinex;
use bitfinex::orders::{CancelOrderForm, OrderMeta, OrderResponse};
use bitfinex::ticker::TradingPairTicker;
use futures_retry::{FutureRetry, RetryPolicy, StreamRetryExt};
use futures_util::task::FutureObj;
use log::trace;
use tokio::time::Duration;
use crate::currency::SymbolPair;
use crate::models::{
@ -14,6 +18,8 @@ use crate::models::{
PriceTicker, TradingPlatform,
};
use crate::BoxError;
use std::error::Error;
use tokio::macros::support::Future;
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub enum Exchange {
@ -142,6 +148,13 @@ pub struct BitfinexConnector {
impl BitfinexConnector {
const AFFILIATE_CODE: &'static str = "XPebOgHxA";
fn handle_small_nonce_error(e: BoxError) -> RetryPolicy<BoxError> {
if e.to_string().contains("nonce: small") {
return RetryPolicy::WaitRetry(Duration::from_millis(1));
}
return RetryPolicy::ForwardError(e);
}
pub fn new(api_key: &str, api_secret: &str) -> Self {
BitfinexConnector {
bfx: Bitfinex::new(Some(api_key.into()), Some(api_secret.into())),
@ -164,7 +177,12 @@ impl Connector for BitfinexConnector {
}
async fn active_positions(&self, pair: &SymbolPair) -> Result<Option<Vec<Position>>, BoxError> {
let active_positions = self.bfx.positions.active_positions().await?;
let (active_positions, _) = FutureRetry::new(
move || self.bfx.positions.active_positions(),
BitfinexConnector::handle_small_nonce_error,
)
.await
.map_err(|(e, attempts)| e)?;
let positions: Vec<_> = active_positions
.into_iter()
@ -172,11 +190,8 @@ impl Connector for BitfinexConnector {
.filter(|x: &Position| x.pair() == pair)
.collect();
if positions.is_empty() {
Ok(None)
} else {
Ok(Some(positions))
}
trace!("\t[PositionManager] Retrieved positions for {}", pair);
Ok((!positions.is_empty()).then_some(positions))
}
async fn current_prices(&self, pair: &SymbolPair) -> Result<TradingPairTicker, BoxError> {
@ -188,7 +203,12 @@ impl Connector for BitfinexConnector {
}
async fn active_orders(&self, _: &SymbolPair) -> Result<Vec<ActiveOrder>, BoxError> {
let response = self.bfx.orders.active_orders().await?;
let (response, _) = FutureRetry::new(
move || self.bfx.orders.active_orders(),
BitfinexConnector::handle_small_nonce_error,
)
.await
.map_err(|(e, attempts)| e)?;
Ok(response.iter().map(Into::into).collect())
}
@ -227,7 +247,22 @@ impl Connector for BitfinexConnector {
BitfinexConnector::AFFILIATE_CODE.to_string(),
));
let response = self.bfx.orders.submit_order(&order_form).await?;
// retry to submit the order until it succeeds.
// the function may fail due to concurrent signed requests
// parsed in different times by the server
let response = {
loop {
match self.bfx.orders.submit_order(&order_form).await {
Ok(response) => break response,
Err(e) => {
if !e.to_string().contains("nonce: small") {
return Err(e);
}
tokio::time::sleep(Duration::from_nanos(1)).await;
}
}
}
};
Ok((&response).try_into()?)
}
@ -235,13 +270,26 @@ impl Connector for BitfinexConnector {
async fn order_book(&self, pair: &SymbolPair) -> Result<OrderBook, BoxError> {
let symbol_name = BitfinexConnector::format_trading_pair(pair);
let x = self
.bfx
.book
.trading_pair(symbol_name, bitfinex::book::BookPrecision::P0)
.await?;
let response = {
loop {
match self
.bfx
.book
.trading_pair(symbol_name.clone(), bitfinex::book::BookPrecision::P0)
.await
{
Ok(response) => break response,
Err(e) => {
if !e.to_string().contains("nonce: small") {
return Err(e);
}
tokio::time::sleep(Duration::from_nanos(1)).await;
}
}
}
};
let entries = x
let entries = response
.into_iter()
.map(|x| OrderBookEntry::Trading {
price: x.price,
@ -256,7 +304,21 @@ impl Connector for BitfinexConnector {
async fn cancel_order(&self, order: &ActiveOrder) -> Result<ActiveOrder, BoxError> {
let cancel_form = order.into();
Ok((&self.bfx.orders.cancel_order(&cancel_form).await?).try_into()?)
let response = {
loop {
match self.bfx.orders.cancel_order(&cancel_form).await {
Ok(response) => break response,
Err(e) => {
if !e.to_string().contains("nonce: small") {
return Err(e);
}
tokio::time::sleep(Duration::from_nanos(1)).await;
}
}
}
};
Ok((&response).try_into()?)
}
}

View File

@ -2,13 +2,12 @@
#![feature(bool_to_option)]
use fern::colors::{Color, ColoredLevelConfig};
use log::LevelFilter::Debug;
use log::LevelFilter::{Debug, Trace};
use tokio::time::Duration;
use crate::bot::BfxBot;
use crate::connectors::ExchangeKind;
use crate::connectors::ExchangeDetails;
use crate::currency::Symbol;
use crate::strategy::TrailingStop;
mod bot;
mod connectors;
@ -25,18 +24,31 @@ pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
async fn main() -> Result<(), BoxError> {
setup_logger()?;
// TEST
let test_api_key = "P1EVE68DJByDAkGQvpIkTwfrbYXd2Vo2ZaIhTYb9vx2";
let test_api_secret = "1nicg8z0zKVEt5Rb7ZDpIYjVYVTgvCaCPMZqB0niFli";
let bitfinex = ExchangeKind::Bitfinex {
api_key: test_api_key.into(),
api_secret: test_api_secret.into(),
// REAL
// let orders_api_key = "hc5nDvYbFYJZMKdnzYq8P4AzCSwjxfQHnMyrg69Sf4c";
// let orders_api_secret = "53x9goIOpbOtBoPi7dmigK5Cq5e0282EUO2qRIMEXlh";
// let prices_api_key = "gTfFZUCwRBE0Z9FZjyk9HNe4lZ7XuiZY9rrW71SyUr9";
// let prices_api_secret = "zWbxvoFZad3BPIiXK4DKfEvC0YsAuaApbeAyI8OBXgN";
// let positions_api_key = "PfR7BadPZPNdVZnkHFBfAjsg7gjt8pAecMj5B8eRPFi";
// let positions_api_secret = "izzvxtE3XsBBRpVCHGJ8f60UA56SmPNbBvJGVd67aqD";
let bitfinex = ExchangeDetails::Bitfinex {
prices_api_key: test_api_key.into(),
prices_api_secret: test_api_secret.into(),
orders_api_key: test_api_key.into(),
orders_api_secret: test_api_secret.into(),
positions_api_key: test_api_key.into(),
positions_api_secret: test_api_secret.into(),
};
let mut bot = BfxBot::new(
vec![bitfinex],
vec![Symbol::TESTBTC],
Symbol::TESTUSD,
vec![Symbol::BTC, Symbol::ETH, Symbol::XMR],
Symbol::USD,
Duration::new(1, 0),
);
@ -60,7 +72,7 @@ fn setup_logger() -> Result<(), fern::InitError> {
message
))
})
.level(Debug)
.level(Trace)
.filter(|metadata| metadata.target().contains("rustybot"))
.chain(std::io::stdout())
// .chain(fern::log_file("rustico.log")?)

View File

@ -184,7 +184,6 @@ impl PositionManagerHandle {
.await?;
let response = recv.await?;
println!("Got response: {:?}", response);
Ok(response)
}
}
@ -228,8 +227,6 @@ impl PositionManager {
_ => (None, None),
};
println!("Responding with {:?}", messages);
Ok(msg
.respond_to
.send((events, messages))
@ -237,7 +234,7 @@ impl PositionManager {
}
pub async fn update(&mut self, tick: u64) -> Result<OptionUpdate, BoxError> {
// debug!("Updating {}", self.pair);
trace!("\t[PositionManager] Updating {}", self.pair);
let opt_active_positions = self.client.active_positions(&self.pair).await?;
self.current_tick = tick;
@ -283,7 +280,6 @@ impl PositionManager {
.insert(self.current_tick(), pos_post_tick.clone());
self.active_position = Some(pos_post_tick);
println!("Returning: {:?}", messages);
return Ok((events, messages));
}
}
@ -408,15 +404,11 @@ impl OrderManager {
info!("Closing position #{}", position_id);
debug!("Retrieving open orders, positions and current prices...");
// let (open_orders, order_book, open_positions) = tokio::join!(
// self.client.active_orders(&self.pair),
// self.client.order_book(&self.pair),
// self.client.active_positions(&self.pair)
// );
let open_orders = self.client.active_orders(&self.pair).await;
let order_book = self.client.order_book(&self.pair).await;
let open_positions = self.client.active_positions(&self.pair).await;
let (open_orders, order_book, open_positions) = tokio::join!(
self.client.active_orders(&self.pair),
self.client.order_book(&self.pair),
self.client.active_positions(&self.pair)
);
let open_orders = match open_orders {
Ok(open_orders) => open_orders,
@ -518,13 +510,13 @@ impl OrderManager {
// TODO: finish me
pub async fn update(&self) -> Result<OptionUpdate, BoxError> {
trace!("\tUpdating {} order manager.", self.pair);
//
// let (open_orders, opt_open_positions) = tokio::join!(
// self.client.active_orders(&self.pair),
// self.client.active_positions(&self.pair)
// );
// let (_open_orders, _opt_open_positions) = (open_orders?, opt_open_positions?);
trace!("\t[OrderManager] Updating {}", self.pair);
let (open_orders, opt_open_positions) = tokio::join!(
self.client.active_orders(&self.pair),
self.client.active_positions(&self.pair)
);
let (_open_orders, _opt_open_positions) = (open_orders?, opt_open_positions?);
Ok((None, None))
}
@ -606,7 +598,6 @@ impl PairManager {
messages.merge(opt_pos_messages);
messages.merge(opt_order_messages);
println!("Messages: {:?}", messages);
// TODO: to move into Handler?
if let Some(messages) = messages {
for m in messages {