initial support for websockets

This commit is contained in:
Giulio De Pasquale 2021-02-12 15:00:22 +00:00
parent 054b3b6659
commit 7014917acb
4 changed files with 133 additions and 197 deletions

232
rustybot/Cargo.lock generated
View File

@ -87,18 +87,6 @@ dependencies = [
"rustc-demangle",
]
[[package]]
name = "base64"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
[[package]]
name = "base64"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff"
[[package]]
name = "base64"
version = "0.13.0"
@ -120,8 +108,8 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"tokio 1.1.0",
"tungstenite 0.9.2",
"tokio",
"tungstenite",
"url",
]
@ -131,34 +119,13 @@ version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "block-buffer"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b"
dependencies = [
"block-padding",
"byte-tools",
"byteorder",
"generic-array 0.12.3",
]
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array 0.14.4",
]
[[package]]
name = "block-padding"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5"
dependencies = [
"byte-tools",
"generic-array",
]
[[package]]
@ -167,40 +134,18 @@ version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820"
[[package]]
name = "byte-tools"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
[[package]]
name = "byteorder"
version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
[[package]]
name = "bytes"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
dependencies = [
"byteorder",
"iovec",
]
[[package]]
name = "bytes"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]]
name = "bytes"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16"
[[package]]
name = "bytes"
version = "1.0.1"
@ -271,22 +216,13 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634"
[[package]]
name = "digest"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5"
dependencies = [
"generic-array 0.12.3",
]
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array 0.14.4",
"generic-array",
]
[[package]]
@ -314,12 +250,6 @@ dependencies = [
"version_check",
]
[[package]]
name = "fake-simd"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
[[package]]
name = "fern"
version = "0.6.0"
@ -438,7 +368,7 @@ checksum = "fde5a672a61f96552aa5ed9fd9c81c3fbdae4be9b1e205d6eaf17c83705adc0f"
dependencies = [
"futures",
"pin-project-lite",
"tokio 1.1.0",
"tokio",
]
[[package]]
@ -476,15 +406,6 @@ dependencies = [
"slab",
]
[[package]]
name = "generic-array"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec"
dependencies = [
"typenum",
]
[[package]]
name = "generic-array"
version = "0.14.4"
@ -534,10 +455,10 @@ dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"http 0.2.2",
"http",
"indexmap",
"slab",
"tokio 1.1.0",
"tokio",
"tokio-util",
"tracing",
"tracing-futures",
@ -564,17 +485,6 @@ version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35"
[[package]]
name = "http"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6ccf5ede3a895d8856620237b2f02972c1bbc78d2965ad7fe8838d4a0ed41f0"
dependencies = [
"bytes 0.4.12",
"fnv",
"itoa",
]
[[package]]
name = "http"
version = "0.2.2"
@ -593,7 +503,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2861bd27ee074e5ee891e8b539837a9430012e249d7f0ca2d795650f579c1994"
dependencies = [
"bytes 1.0.1",
"http 0.2.2",
"http",
]
[[package]]
@ -619,14 +529,14 @@ dependencies = [
"futures-core",
"futures-util",
"h2",
"http 0.2.2",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project 1.0.2",
"socket2",
"tokio 1.1.0",
"tokio",
"tower-service",
"tracing",
"want",
@ -641,7 +551,7 @@ dependencies = [
"bytes 1.0.1",
"hyper",
"native-tls",
"tokio 1.1.0",
"tokio",
"tokio-native-tls",
]
@ -668,20 +578,11 @@ dependencies = [
[[package]]
name = "input_buffer"
version = "0.2.0"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e1b822cc844905551931d6f81608ed5f50a79c1078a4e2b4d42dbc7c1eedfbf"
checksum = "f97967975f448f1a7ddb12b0bc41069d09ed6a1c161a92687e057325db35d413"
dependencies = [
"bytes 0.4.12",
]
[[package]]
name = "input_buffer"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754"
dependencies = [
"bytes 0.5.6",
"bytes 1.0.1",
]
[[package]]
@ -693,15 +594,6 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
dependencies = [
"libc",
]
[[package]]
name = "ipnet"
version = "2.3.0"
@ -903,12 +795,6 @@ version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
[[package]]
name = "opaque-debug"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"
[[package]]
name = "opaque-debug"
version = "0.3.0"
@ -1217,12 +1103,12 @@ version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd281b1030aa675fb90aa994d07187645bb3c8fc756ca766e7c3070b439de9de"
dependencies = [
"base64 0.13.0",
"base64",
"bytes 1.0.1",
"encoding_rs",
"futures-core",
"futures-util",
"http 0.2.2",
"http",
"http-body",
"hyper",
"hyper-tls",
@ -1236,7 +1122,7 @@ dependencies = [
"pin-project-lite",
"serde",
"serde_urlencoded",
"tokio 1.1.0",
"tokio",
"tokio-native-tls",
"url",
"wasm-bindgen",
@ -1282,8 +1168,9 @@ dependencies = [
"log 0.4.11",
"merge",
"regex",
"tokio 1.1.0",
"tokio",
"tokio-tungstenite",
"tungstenite",
]
[[package]]
@ -1371,29 +1258,17 @@ dependencies = [
"serde",
]
[[package]]
name = "sha-1"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df"
dependencies = [
"block-buffer 0.7.3",
"digest 0.8.1",
"fake-simd",
"opaque-debug 0.2.3",
]
[[package]]
name = "sha-1"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce3cdf1b5e620a498ee6f2a171885ac7e22f0e12089ec4b3d22b84921792507c"
dependencies = [
"block-buffer 0.9.0",
"block-buffer",
"cfg-if 1.0.0",
"cpuid-bool",
"digest 0.9.0",
"opaque-debug 0.3.0",
"digest",
"opaque-debug",
]
[[package]]
@ -1494,20 +1369,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "720ba21c25078711bf456d607987d95bce90f7c3bea5abe1db587862e7a1e87c"
dependencies = [
"autocfg",
"bytes 0.6.0",
"libc",
"memchr",
"mio",
"pin-project-lite",
]
[[package]]
name = "tokio"
version = "1.1.0"
@ -1546,7 +1407,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b"
dependencies = [
"native-tls",
"tokio 1.1.0",
"tokio",
]
[[package]]
@ -1557,20 +1418,20 @@ checksum = "76066865172052eb8796c686f0b441a93df8b08d40a950b062ffb9a426f00edd"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio 1.1.0",
"tokio",
]
[[package]]
name = "tokio-tungstenite"
version = "0.12.0"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0381c1e6e08908317cee104781ca48afe03f37cc857792b85f01f9828fb55ba3"
checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b"
dependencies = [
"futures-util",
"log 0.4.11",
"pin-project 1.0.2",
"tokio 0.3.6",
"tungstenite 0.11.1",
"tokio",
"tungstenite",
]
[[package]]
@ -1585,7 +1446,7 @@ dependencies = [
"futures-sink",
"log 0.4.11",
"pin-project-lite",
"tokio 1.1.0",
"tokio",
"tokio-stream",
]
@ -1633,39 +1494,20 @@ checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "tungstenite"
version = "0.9.2"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a0c2bd5aeb7dcd2bb32e472c8872759308495e5eccc942e929a513cd8d36110"
checksum = "8ada8297e8d70872fa9a551d93250a9f407beb9f37ef86494eb20012a2ff7c24"
dependencies = [
"base64 0.11.0",
"base64",
"byteorder",
"bytes 0.4.12",
"http 0.1.21",
"bytes 1.0.1",
"http",
"httparse",
"input_buffer 0.2.0",
"input_buffer",
"log 0.4.11",
"native-tls",
"rand 0.7.3",
"sha-1 0.8.2",
"url",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0308d80d86700c5878b9ef6321f020f29b1bb9d5ff3cab25e75e23f3a492a23"
dependencies = [
"base64 0.12.3",
"byteorder",
"bytes 0.5.6",
"http 0.2.2",
"httparse",
"input_buffer 0.3.1",
"log 0.4.11",
"rand 0.7.3",
"sha-1 0.9.2",
"rand 0.8.2",
"sha-1",
"url",
"utf-8",
]

View File

@ -9,7 +9,6 @@ edition = "2018"
[dependencies]
bitfinex = { path= "/home/giulio/dev/bitfinex-rs" }
tokio = { version = "1", features=["full"]}
tokio-tungstenite = "*"
futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] }
async-trait = "0.1"
regex = "1"
@ -20,4 +19,6 @@ chrono = "0.4"
byteorder = "1"
float-cmp = "0.8"
merge = "0.1"
futures-retry = "0.6"
futures-retry = "0.6"
tungstenite = "0.12"
tokio-tungstenite = "0.13"

View File

@ -5,6 +5,7 @@ use tokio::time::sleep;
use crate::connectors::ExchangeDetails;
use crate::currency::{Symbol, SymbolPair};
use crate::frontend::FrontendManagerHandle;
use crate::managers::ExchangeManager;
use crate::ticker::Ticker;
use crate::BoxError;
@ -12,6 +13,7 @@ use crate::BoxError;
pub struct BfxBot {
ticker: Ticker,
exchange_managers: Vec<ExchangeManager>,
frontend_connector: FrontendManagerHandle,
}
impl BfxBot {
@ -34,6 +36,7 @@ impl BfxBot {
BfxBot {
ticker: Ticker::new(tick_duration),
exchange_managers,
frontend_connector: FrontendManagerHandle::new(),
}
}

90
rustybot/src/frontend.rs Normal file
View File

@ -0,0 +1,90 @@
use log::info;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use crate::events::{ActorMessage, Message};
use crate::BoxError;
use futures_util::stream::TryStreamExt;
use futures_util::StreamExt;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::accept_async;
#[derive(Debug)]
pub struct FrontendManager {
receiver: Receiver<ActorMessage>,
}
impl FrontendManager {
pub fn new(receiver: Receiver<ActorMessage>) -> Self {
Self { receiver }
}
async fn handle_ws_connection(stream: TcpStream, addr: SocketAddr) -> Result<(), BoxError> {
let mut websocket = accept_async(stream).await?;
info!("Received WebSocket connection <{:?}>", addr);
let (ws_out, ws_in) = websocket.split();
let on_received = ws_in.try_for_each(move |msg| {
info!(
"Received a message from {:?}: {}",
addr,
msg.to_text().unwrap()
);
futures_util::future::ok(())
});
tokio::spawn(on_received);
Ok(())
}
pub async fn websocket() -> Result<(), BoxError> {
let server = TcpListener::bind("127.0.0.1:3012").await?;
while let Ok((stream, addr)) = server.accept().await {
tokio::spawn(FrontendManager::handle_ws_connection(stream, addr));
}
Ok(())
}
pub async fn handle_message(&mut self, message: ActorMessage) -> Result<(), BoxError> {
match message.message {
_ => {}
}
Ok(message
.respond_to
.send((None, None))
.map_err(|_| BoxError::from("Could not send message."))?)
}
}
pub struct FrontendManagerHandle {
sender: Sender<ActorMessage>,
}
impl FrontendManagerHandle {
async fn run_frontend_manager(mut manager: FrontendManager) {
info!("Frontend handler ready");
while let Some(msg) = manager.receiver.recv().await {
manager.handle_message(msg).await.unwrap();
}
}
pub fn new() -> Self {
let (sender, receiver) = channel(1);
let frontend = FrontendManager::new(receiver);
tokio::spawn(FrontendManager::websocket());
tokio::spawn(FrontendManagerHandle::run_frontend_manager(frontend));
Self { sender }
}
}