diff --git a/crates/transport/src/actors/worker.rs b/crates/transport/src/actors/worker.rs index 15f0a02..35dc6b3 100644 --- a/crates/transport/src/actors/worker.rs +++ b/crates/transport/src/actors/worker.rs @@ -11,32 +11,18 @@ use libp2p_swarm_derive::NetworkBehaviour; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; -use sqd_messages::{LogsRequest, Query, QueryLogs, QueryResult, WorkerStatus}; +use sqd_messages::{LogsRequest, QueryLogs, WorkerStatus}; use crate::{ QueueFull, behaviour::{ base::{BaseBehaviour, BaseBehaviourEvent}, noise::NoiseBehaviour, request_server::{Request, ServerBehaviour}, wrapped::{BehaviourWrapper, TToSwarm, Wrapped} }, codec::ProtoCodec, protocol::{ - MAX_HEARTBEAT_SIZE, MAX_LOGS_REQUEST_SIZE, MAX_LOGS_RESPONSE_SIZE, MAX_QUERY_MSG_SIZE, MAX_QUERY_RESULT_SIZE, QUERY_PROTOCOL, SQL_QUERY_PROTOCOL, WORKER_LOGS_PROTOCOL, WORKER_STATUS_PROTOCOL + MAX_HEARTBEAT_SIZE, MAX_LOGS_REQUEST_SIZE, MAX_LOGS_RESPONSE_SIZE, WORKER_LOGS_PROTOCOL, WORKER_STATUS_PROTOCOL }, record_event, util::{DEFAULT_SHUTDOWN_TIMEOUT, Receiver, Sender, TaskManager, new_queue} }; #[derive(Debug)] pub enum WorkerEvent { - /// Query received from a portal - Query { - peer_id: PeerId, - query: Query, - /// If this channel is dropped, the connection will be closed - resp_chan: ResponseChannel, - }, - /// SQLQuery received from a portal - SqlQuery { - peer_id: PeerId, - query: Query, - /// If this channel is dropped, the connection will be closed - resp_chan: ResponseChannel, - }, /// Logs requested by a collector LogsRequest { request: LogsRequest, @@ -50,16 +36,13 @@ pub enum WorkerEvent { }, } -type QueryBehaviour = Wrapped>>; -type SqlQueryBehaviour = Wrapped>>; type LogsBehaviour = Wrapped>>; type StatusBehaviour = Wrapped>>; #[derive(NetworkBehaviour)] pub struct InnerBehaviour { base: Wrapped, - query: QueryBehaviour, - sql_query: SqlQueryBehaviour, + query_streams: libp2p_stream::Behaviour, logs: LogsBehaviour, status: StatusBehaviour, noise: NoiseBehaviour, @@ -67,9 +50,6 @@ pub struct InnerBehaviour { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkerConfig { - pub heartbeats_queue_size: usize, - pub query_results_queue_size: usize, - pub sql_query_results_queue_size: usize, pub logs_queue_size: usize, pub status_queue_size: usize, pub events_queue_size: usize, @@ -81,9 +61,6 @@ pub struct WorkerConfig { impl Default for WorkerConfig { fn default() -> Self { Self { - heartbeats_queue_size: 100, - query_results_queue_size: 100, - sql_query_results_queue_size: 100, logs_queue_size: 1, status_queue_size: 10, events_queue_size: 100, @@ -99,23 +76,16 @@ pub struct WorkerBehaviour { } impl WorkerBehaviour { - pub fn new(mut base: BaseBehaviour, config: &WorkerConfig) -> Wrapped { + pub fn new( + mut base: BaseBehaviour, + config: &WorkerConfig, + query_streams: libp2p_stream::Behaviour, + ) -> Wrapped { base.set_server_mode(); Self { inner: InnerBehaviour { base: base.into(), - query: ServerBehaviour::new( - ProtoCodec::new(MAX_QUERY_MSG_SIZE, MAX_QUERY_RESULT_SIZE), - QUERY_PROTOCOL, - config.query_execution_timeout, - ) - .into(), - sql_query: ServerBehaviour::new( - ProtoCodec::new(MAX_QUERY_MSG_SIZE, MAX_QUERY_RESULT_SIZE), - SQL_QUERY_PROTOCOL, - config.query_execution_timeout, - ) - .into(), + query_streams, logs: ServerBehaviour::new( ProtoCodec::new(MAX_LOGS_REQUEST_SIZE, MAX_LOGS_RESPONSE_SIZE), WORKER_LOGS_PROTOCOL, @@ -138,68 +108,6 @@ impl WorkerBehaviour { None } - fn on_query( - &mut self, - peer_id: PeerId, - query: Query, - resp_chan: ResponseChannel, - ) -> Option { - // Drop empty messages - if query == Query::default() { - None - } else { - Some(WorkerEvent::Query { - peer_id, - query, - resp_chan, - }) - } - } - - fn on_sql_query( - &mut self, - peer_id: PeerId, - query: Query, - resp_chan: ResponseChannel, - ) -> Option { - // Drop empty messages - if query == Query::default() { - None - } else { - Some(WorkerEvent::SqlQuery { - peer_id, - query, - resp_chan, - }) - } - } - - pub fn send_query_result( - &mut self, - result: QueryResult, - resp_chan: ResponseChannel, - ) { - log::debug!("Sending query result {result:?}"); - - self.inner - .query - .try_send_response(resp_chan, result) - .unwrap_or_else(|e| log::error!("Cannot send result for query {}", e.query_id)); - } - - pub fn send_sql_query_result( - &mut self, - result: QueryResult, - resp_chan: ResponseChannel, - ) { - log::debug!("Sending sql query result {result:?}"); - - self.inner - .sql_query - .try_send_response(resp_chan, result) - .unwrap_or_else(|e| log::error!("Cannot send sql result for query {}", e.query_id)); - } - fn on_logs_request( &mut self, _peer_id: PeerId, @@ -252,16 +160,7 @@ impl BehaviourWrapper for WorkerBehaviour { ) -> impl IntoIterator> { let ev = match ev { InnerBehaviourEvent::Base(ev) => self.on_base_event(ev), - InnerBehaviourEvent::Query(Request { - peer_id, - request, - response_channel, - }) => self.on_query(peer_id, request, response_channel), - InnerBehaviourEvent::SqlQuery(Request { - peer_id, - request, - response_channel, - }) => self.on_sql_query(peer_id, request, response_channel), + InnerBehaviourEvent::QueryStreams(()) => None, InnerBehaviourEvent::Logs(Request { peer_id, request, @@ -279,8 +178,6 @@ impl BehaviourWrapper for WorkerBehaviour { struct WorkerTransport { swarm: Swarm>, - query_results_rx: Receiver<(QueryResult, ResponseChannel)>, - sql_query_results_rx: Receiver<(QueryResult, ResponseChannel)>, logs_rx: Receiver<(QueryLogs, ResponseChannel)>, status_rx: Receiver<(WorkerStatus, ResponseChannel)>, events_tx: Sender, @@ -293,8 +190,6 @@ impl WorkerTransport { tokio::select! { _ = cancel_token.cancelled() => break, ev = self.swarm.select_next_some() => self.on_swarm_event(ev), - Some((res, resp_chan)) = self.query_results_rx.recv() => self.swarm.behaviour_mut().send_query_result(res, resp_chan), - Some((res, resp_chan)) = self.sql_query_results_rx.recv() => self.swarm.behaviour_mut().send_sql_query_result(res, resp_chan), Some((logs, resp_chan)) = self.logs_rx.recv() => self.swarm.behaviour_mut().send_logs(logs, resp_chan), Some((status, resp_chan)) = self.status_rx.recv() => self.swarm.behaviour_mut().send_status(status, resp_chan), } @@ -313,8 +208,6 @@ impl WorkerTransport { #[derive(Clone)] pub struct WorkerTransportHandle { - query_results_tx: Sender<(QueryResult, ResponseChannel)>, - sql_query_results_tx: Sender<(QueryResult, ResponseChannel)>, logs_tx: Sender<(QueryLogs, ResponseChannel)>, status_tx: Sender<(WorkerStatus, ResponseChannel)>, _task_manager: Arc, // This ensures that transport is stopped when the last handle is dropped @@ -322,8 +215,6 @@ pub struct WorkerTransportHandle { impl WorkerTransportHandle { fn new( - query_results_tx: Sender<(QueryResult, ResponseChannel)>, - sql_query_results_tx: Sender<(QueryResult, ResponseChannel)>, logs_tx: Sender<(QueryLogs, ResponseChannel)>, status_tx: Sender<(WorkerStatus, ResponseChannel)>, transport: WorkerTransport, @@ -332,32 +223,12 @@ impl WorkerTransportHandle { let mut task_manager = TaskManager::new(shutdown_timeout); task_manager.spawn(|c| transport.run(c)); Self { - query_results_tx, - sql_query_results_tx, logs_tx, status_tx, _task_manager: Arc::new(task_manager), } } - pub fn send_query_result( - &self, - result: QueryResult, - resp_chan: ResponseChannel, - ) -> Result<(), QueueFull> { - log::debug!("Queueing query result {result:?}"); - self.query_results_tx.try_send((result, resp_chan)) - } - - pub fn send_sql_query_result( - &self, - result: QueryResult, - resp_chan: ResponseChannel, - ) -> Result<(), QueueFull> { - log::debug!("Queueing sql query result {result:?}"); - self.sql_query_results_tx.try_send((result, resp_chan)) - } - pub fn send_logs( &self, logs: QueryLogs, @@ -381,24 +252,16 @@ pub fn start_transport( swarm: Swarm>, config: &WorkerConfig, ) -> (impl Stream, WorkerTransportHandle) { - let (query_results_tx, query_results_rx) = - new_queue(config.query_results_queue_size, "query_results"); - let (sql_query_results_tx, sql_query_results_rx) = - new_queue(config.sql_query_results_queue_size, "sql_query_results"); let (logs_tx, logs_rx) = new_queue(config.logs_queue_size, "logs"); let (status_tx, status_rx) = new_queue(config.status_queue_size, "status"); let (events_tx, events_rx) = new_queue(config.events_queue_size, "events"); let transport = WorkerTransport { swarm, - query_results_rx, - sql_query_results_rx, logs_rx, status_rx, events_tx, }; let handle = WorkerTransportHandle::new( - query_results_tx, - sql_query_results_tx, logs_tx, status_tx, transport, diff --git a/crates/transport/src/builder.rs b/crates/transport/src/builder.rs index 157aa9a..05ef628 100644 --- a/crates/transport/src/builder.rs +++ b/crates/transport/src/builder.rs @@ -215,7 +215,17 @@ impl P2PTransportBuilder { pub async fn build_worker( self, config: WorkerConfig, - ) -> Result<(impl Stream, WorkerTransportHandle), Error> { + ) -> Result< + ( + impl Stream, + WorkerTransportHandle, + libp2p_stream::IncomingStreams, + libp2p_stream::IncomingStreams, + ), + Error, + > { + use crate::protocol::{QUERY_PROTOCOL, SQL_QUERY_PROTOCOL}; + let local_peer_id = self.local_peer_id(); // Wait for the worker to be registered on chain loop { @@ -233,8 +243,22 @@ impl P2PTransportBuilder { } break; } - let swarm = self.build_swarm(|base| WorkerBehaviour::new(base, &config))?; - Ok(worker::start_transport(swarm, &config)) + + // Create stream behaviour for query protocols. + // IncomingStreams are returned to the caller for direct handling. + let query_stream_behaviour = libp2p_stream::Behaviour::new(); + let mut control = query_stream_behaviour.new_control(); + let query_streams = control + .accept(StreamProtocol::new(QUERY_PROTOCOL)) + .expect("Query protocol should not already be registered"); + let sql_query_streams = control + .accept(StreamProtocol::new(SQL_QUERY_PROTOCOL)) + .expect("SQL query protocol should not already be registered"); + + let swarm = + self.build_swarm(|base| WorkerBehaviour::new(base, &config, query_stream_behaviour))?; + let (events, handle) = worker::start_transport(swarm, &config); + Ok((events, handle, query_streams, sql_query_streams)) } #[cfg(feature = "sql-client")] diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index d2f5767..1a88712 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -23,7 +23,7 @@ use tokio::sync::mpsc; pub use libp2p::{ identity::{Keypair, ParseError as IdParseError, PublicKey}, request_response::ResponseChannel, - Multiaddr, PeerId, + Multiaddr, PeerId, Stream, }; #[cfg(feature = "metrics")] @@ -70,6 +70,8 @@ pub use crate::actors::portal_logs_collector::{ pub use crate::actors::worker::{ WorkerBehaviour, WorkerConfig, WorkerEvent, WorkerTransportHandle, }; +#[cfg(feature = "worker")] +pub use libp2p_stream::IncomingStreams; #[cfg(feature = "sql-client")] pub use crate::actors::sql_client::{ SQLClientBehaviour, SQLClientConfig, SQLClientTransport, SQLQueryFailure,