From 3f5e6e3a70891ce114ac8b60ac829021ba583416 Mon Sep 17 00:00:00 2001 From: Giulio De Pasquale Date: Thu, 18 Feb 2021 09:37:48 +0000 Subject: [PATCH] added frontend components --- rustybot/Cargo.lock | 91 +++++++++------------------------------- rustybot/Cargo.toml | 3 +- rustybot/src/bot.rs | 3 ++ rustybot/src/frontend.rs | 90 +++++++++++++++++++++++++++++++++++++++ rustybot/src/main.rs | 11 ++--- 5 files changed, 120 insertions(+), 78 deletions(-) create mode 100644 rustybot/src/frontend.rs diff --git a/rustybot/Cargo.lock b/rustybot/Cargo.lock index 82c4d3c..1f0ae4b 100644 --- a/rustybot/Cargo.lock +++ b/rustybot/Cargo.lock @@ -87,12 +87,6 @@ dependencies = [ "rustc-demangle", ] -[[package]] -name = "base64" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" - [[package]] name = "base64" version = "0.13.0" @@ -114,8 +108,8 @@ dependencies = [ "serde", "serde_derive", "serde_json", - "tokio 1.1.0", - "tungstenite 0.12.0", + "tokio", + "tungstenite", "url", ] @@ -152,12 +146,6 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" -[[package]] -name = "bytes" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" - [[package]] name = "bytes" version = "1.0.1" @@ -386,7 +374,7 @@ checksum = "fde5a672a61f96552aa5ed9fd9c81c3fbdae4be9b1e205d6eaf17c83705adc0f" dependencies = [ "futures", "pin-project-lite", - "tokio 1.1.0", + "tokio", ] [[package]] @@ -476,7 +464,7 @@ dependencies = [ "http", "indexmap", "slab", - "tokio 1.1.0", + "tokio", "tokio-util", "tracing", "tracing-futures", @@ -554,7 +542,7 @@ dependencies = [ "itoa", "pin-project 1.0.2", "socket2", - "tokio 1.1.0", + "tokio", "tower-service", "tracing", "want", @@ -569,7 +557,7 @@ dependencies = [ "bytes 1.0.1", "hyper", "native-tls", - "tokio 1.1.0", + "tokio", "tokio-native-tls", ] @@ -594,15 +582,6 @@ dependencies = [ "hashbrown", ] -[[package]] -name = "input_buffer" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19a8a95243d5a0398cae618ec29477c6e3cb631152be5c19481f80bc71559754" -dependencies = [ - "bytes 0.5.6", -] - [[package]] name = "input_buffer" version = "0.4.0" @@ -1130,7 +1109,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd281b1030aa675fb90aa994d07187645bb3c8fc756ca766e7c3070b439de9de" dependencies = [ - "base64 0.13.0", + "base64", "bytes 1.0.1", "encoding_rs", "futures-core", @@ -1149,7 +1128,7 @@ dependencies = [ "pin-project-lite", "serde", "serde_urlencoded", - "tokio 1.1.0", + "tokio", "tokio-native-tls", "url", "wasm-bindgen", @@ -1196,8 +1175,9 @@ dependencies = [ "log 0.4.11", "merge", "regex", - "tokio 1.1.0", + "tokio", "tokio-tungstenite", + "tungstenite", ] [[package]] @@ -1396,20 +1376,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" -[[package]] -name = "tokio" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "720ba21c25078711bf456d607987d95bce90f7c3bea5abe1db587862e7a1e87c" -dependencies = [ - "autocfg", - "bytes 0.6.0", - "libc", - "memchr", - "mio", - "pin-project-lite", -] - [[package]] name = "tokio" version = "1.1.0" @@ -1448,7 +1414,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" dependencies = [ "native-tls", - "tokio 1.1.0", + "tokio", ] [[package]] @@ -1459,20 +1425,20 @@ checksum = "76066865172052eb8796c686f0b441a93df8b08d40a950b062ffb9a426f00edd" dependencies = [ "futures-core", "pin-project-lite", - "tokio 1.1.0", + "tokio", ] [[package]] name = "tokio-tungstenite" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0381c1e6e08908317cee104781ca48afe03f37cc857792b85f01f9828fb55ba3" +checksum = "e1a5f475f1b9d077ea1017ecbc60890fda8e54942d680ca0b1d2b47cfa2d861b" dependencies = [ "futures-util", "log 0.4.11", "pin-project 1.0.2", - "tokio 0.3.6", - "tungstenite 0.11.1", + "tokio", + "tungstenite", ] [[package]] @@ -1487,7 +1453,7 @@ dependencies = [ "futures-sink", "log 0.4.11", "pin-project-lite", - "tokio 1.1.0", + "tokio", "tokio-stream", ] @@ -1533,37 +1499,18 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" -[[package]] -name = "tungstenite" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0308d80d86700c5878b9ef6321f020f29b1bb9d5ff3cab25e75e23f3a492a23" -dependencies = [ - "base64 0.12.3", - "byteorder", - "bytes 0.5.6", - "http", - "httparse", - "input_buffer 0.3.1", - "log 0.4.11", - "rand 0.7.3", - "sha-1", - "url", - "utf-8", -] - [[package]] name = "tungstenite" version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ada8297e8d70872fa9a551d93250a9f407beb9f37ef86494eb20012a2ff7c24" dependencies = [ - "base64 0.13.0", + "base64", "byteorder", "bytes 1.0.1", "http", "httparse", - "input_buffer 0.4.0", + "input_buffer", "log 0.4.11", "native-tls", "rand 0.8.2", diff --git a/rustybot/Cargo.toml b/rustybot/Cargo.toml index 5f31416..fcc81d4 100644 --- a/rustybot/Cargo.toml +++ b/rustybot/Cargo.toml @@ -9,7 +9,6 @@ edition = "2018" [dependencies] bitfinex = { path= "/home/giulio/dev/bitfinex-rs" } tokio = { version = "1", features=["full"]} -tokio-tungstenite = "*" futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] } async-trait = "0.1" regex = "1" @@ -21,4 +20,6 @@ byteorder = "1" float-cmp = "0.8" merge = "0.1" futures-retry = "0.6" +tungstenite = "0.12" +tokio-tungstenite = "0.13" dotenv = "0.15" \ No newline at end of file diff --git a/rustybot/src/bot.rs b/rustybot/src/bot.rs index f9742dc..91f3ff0 100644 --- a/rustybot/src/bot.rs +++ b/rustybot/src/bot.rs @@ -5,6 +5,7 @@ use tokio::time::sleep; use crate::connectors::ExchangeDetails; use crate::currency::{Symbol, SymbolPair}; +use crate::frontend::FrontendManagerHandle; use crate::managers::ExchangeManager; use crate::ticker::Ticker; use crate::BoxError; @@ -12,6 +13,7 @@ use crate::BoxError; pub struct BfxBot { ticker: Ticker, exchange_managers: Vec, + frontend_connector: FrontendManagerHandle, } impl BfxBot { @@ -34,6 +36,7 @@ impl BfxBot { BfxBot { ticker: Ticker::new(tick_duration), exchange_managers, + frontend_connector: FrontendManagerHandle::new(), } } diff --git a/rustybot/src/frontend.rs b/rustybot/src/frontend.rs new file mode 100644 index 0000000..7b86c7e --- /dev/null +++ b/rustybot/src/frontend.rs @@ -0,0 +1,90 @@ +use log::info; +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +use crate::events::{ActorMessage}; +use crate::BoxError; +use futures_util::stream::TryStreamExt; +use futures_util::StreamExt; + +use std::net::SocketAddr; + +use tokio::net::{TcpListener, TcpStream}; +use tokio_tungstenite::accept_async; + +#[derive(Debug)] +pub struct FrontendManager { + receiver: Receiver, +} + +impl FrontendManager { + pub fn new(receiver: Receiver) -> Self { + Self { receiver } + } + + async fn handle_ws_connection(stream: TcpStream, addr: SocketAddr) -> Result<(), BoxError> { + let websocket = accept_async(stream).await?; + info!("Received WebSocket connection <{:?}>", addr); + + let (_, ws_in) = websocket.split(); + + let on_received = ws_in.try_for_each(move |msg| { + info!( + "Received a message from {:?}: {}", + addr, + msg.to_text().unwrap() + ); + + futures_util::future::ok(()) + }); + + tokio::spawn(on_received); + + Ok(()) + } + + pub async fn websocket() -> Result<(), BoxError> { + let server = TcpListener::bind("127.0.0.1:3012").await?; + + while let Ok((stream, addr)) = server.accept().await { + tokio::spawn(FrontendManager::handle_ws_connection(stream, addr)); + } + + Ok(()) + } + + pub async fn handle_message(&mut self, message: ActorMessage) -> Result<(), BoxError> { + match message.message { + _ => {} + } + + Ok(message + .respond_to + .send((None, None)) + .map_err(|_| BoxError::from("Could not send message."))?) + } +} + +pub struct FrontendManagerHandle { + sender: Sender, +} + +impl FrontendManagerHandle { + // async fn run_frontend_manager(mut manager: FrontendManager) { + // info!("Frontend handler ready"); + // + // while let Some(msg) = manager.receiver.recv().await { + // manager.handle_message(msg).await.unwrap(); + // } + // } + + pub fn new() -> Self { + let (sender, receiver) = channel(1); + + let _frontend = FrontendManager::new(receiver); + + tokio::spawn(FrontendManager::websocket()); + // tokio::spawn(FrontendManagerHandle::run_frontend_manager(frontend)); + + Self { sender } + } +} diff --git a/rustybot/src/main.rs b/rustybot/src/main.rs index 3f76b66..3adc4d8 100644 --- a/rustybot/src/main.rs +++ b/rustybot/src/main.rs @@ -4,7 +4,7 @@ use std::env; use fern::colors::{Color, ColoredLevelConfig}; -use log::LevelFilter::{Debug, Trace}; +use log::LevelFilter::{Trace}; use tokio::time::Duration; use crate::bot::BfxBot; @@ -15,6 +15,7 @@ mod bot; mod connectors; mod currency; mod events; +mod frontend; mod managers; mod models; mod strategy; @@ -28,12 +29,12 @@ async fn main() -> Result<(), BoxError> { dotenv::dotenv()?; let api_key = env::vars() - .find(|(k, v)| k == "API_KEY") - .map(|(k, v)| v) + .find(|(k, _v)| k == "API_KEY") + .map(|(_k, v)| v) .ok_or("API_KEY not set!")?; let api_secret = env::vars() - .find(|(k, v)| k == "API_SECRET") - .map(|(k, v)| v) + .find(|(k, _v)| k == "API_SECRET") + .map(|(_k, v)| v) .ok_or("API_SECRET not set!")?; let bitfinex = ExchangeDetails::Bitfinex {