diff --git a/README.md b/README.md index ed93287..5b39912 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,24 @@ The repository should be cloned under `/root` so the provided `setup-*.sh` scrip chain per port) - service names are unique within a stack; dependency chains stay intra-stack. Service names may be reused across different stacks +- `protocol` selects how a proxy-reachable service is exposed: `http` (the default — routed by + `Host` header on the shared 80/443 listeners) or `tcp`/`udp`, which each require `listen_port` — + the external port nullnet-proxy binds directly and forwards raw traffic from. `listen_port` must + be globally unique per protocol across every stack (the server refuses to start, or rejects a + hot-reload, if two services claim the same `protocol`/`listen_port` pair): + ``` + [[services]] + name = "redis.internal" + timeout = 0 + protocol = "tcp" + listen_port = 6379 + + [[services]] + name = "dns.internal" + timeout = 0 + protocol = "udp" + listen_port = 53 + ``` - run the project as a daemon (from the repo root) ``` @@ -109,6 +127,10 @@ The repository should be cloned under `/root` so the provided `setup-*.sh` scrip - the proxy listens on port 80 (requests in the form `service_name:80`) and, for hosts that have a TLS certificate, on port 443 — HTTP requests to those hosts get a 301 redirect to HTTPS +- for services declared with `protocol = "tcp"` or `"udp"` in the server's stack config, the proxy + also opens a raw listener on each `listen_port` and forwards traffic to the matching service — + no `Host` header involved. This table is pushed live by the server, so listeners open and close + as `services/.toml` changes, without a proxy restart *** diff --git a/members/nullnet-grpc-lib/proto/nullnet_grpc.proto b/members/nullnet-grpc-lib/proto/nullnet_grpc.proto index bf558cf..b7f3e88 100644 --- a/members/nullnet-grpc-lib/proto/nullnet_grpc.proto +++ b/members/nullnet-grpc-lib/proto/nullnet_grpc.proto @@ -34,6 +34,13 @@ service NullnetGrpc { // subscribe (proxy startup load) and again whenever it changes, so every proxy // hot-reloads without a restart. rpc WatchCertificates(Empty) returns (stream CertBundle); + + // Port mapping distribution APIs --------------------------------------------------------------------------------- + + // Long-lived stream: the server pushes the full TCP/UDP port→service mapping + // table immediately on subscribe and again whenever services.toml changes, + // so the proxy opens/closes raw TCP/UDP listeners without a restart. + rpc WatchPortMappings(Empty) returns (stream PortMappingBundle); } // TAP-based clients --------------------------------------------------------------------------------------------------- @@ -157,6 +164,31 @@ message Upstream { uint32 port = 2; } +// Protocol a proxy-reachable service is exposed over. HTTP is routed by Host +// header on the shared 80/443 listeners; TCP/UDP each get a dedicated +// listen_port the proxy binds directly. +enum ServiceProtocol { + HTTP = 0; + TCP = 1; + UDP = 2; +} + +// One entry in the live port→service table. Only services with a non-HTTP +// protocol need an entry — HTTP stays on the existing Host-header routing. +message PortMapping { + string service_name = 1; + ServiceProtocol protocol = 2; + uint32 listen_port = 3; + // Mirrors the service's configured per-client idle timeout (0 disables + // it), so the proxy can expire UDP sessions without a second timeout + // mechanism. + uint64 idle_timeout_secs = 4; +} + +message PortMappingBundle { + repeated PortMapping mappings = 1; +} + // Backend-triggered chains -------------------------------------------------------------------------------------------- message BackendTriggerRequest { @@ -226,6 +258,10 @@ message AgentEvent { AgentUpstreamIpParseFailed upstream_ip_parse_failed = 20; AgentProxyClientNotInet proxy_client_not_inet = 21; AgentTlsCertificateInvalid tls_certificate_invalid = 23; + AgentTcpListenerBindFailed tcp_listener_bind_failed = 26; + AgentUdpListenerBindFailed udp_listener_bind_failed = 27; + AgentTcpUpstreamConnectFailed tcp_upstream_connect_failed = 28; + AgentUdpUpstreamConnectFailed udp_upstream_connect_failed = 29; // Proxy info events AgentProxyRequestRouted proxy_request_routed = 22; } @@ -255,4 +291,8 @@ message AgentProxyRequestInvalidHost { string client_ip = 1; } message AgentUpstreamIpParseFailed { string raw_ip = 1; string service_name = 2; } message AgentProxyClientNotInet { string address_family = 1; } message AgentTlsCertificateInvalid { string domain = 1; string reason = 2; } -message AgentProxyRequestRouted { string service_name = 1; string client_ip = 2; string upstream_ip = 3; uint64 latency_ms = 4; } \ No newline at end of file +message AgentProxyRequestRouted { string service_name = 1; string client_ip = 2; string upstream_ip = 3; uint64 latency_ms = 4; } +message AgentTcpListenerBindFailed { uint32 listen_port = 1; string service_name = 2; string error_message = 3; } +message AgentUdpListenerBindFailed { uint32 listen_port = 1; string service_name = 2; string error_message = 3; } +message AgentTcpUpstreamConnectFailed { string service_name = 1; string client_ip = 2; string error_message = 3; } +message AgentUdpUpstreamConnectFailed { string service_name = 1; string client_ip = 2; string error_message = 3; } \ No newline at end of file diff --git a/members/nullnet-grpc-lib/src/lib.rs b/members/nullnet-grpc-lib/src/lib.rs index 473076a..4d3073b 100644 --- a/members/nullnet-grpc-lib/src/lib.rs +++ b/members/nullnet-grpc-lib/src/lib.rs @@ -2,8 +2,8 @@ mod proto; use crate::nullnet_grpc::nullnet_grpc_client::NullnetGrpcClient; use crate::nullnet_grpc::{ - AgentEvent, BackendTriggerRequest, CertBundle, Empty, MsgId, NetMessage, NetType, ProxyRequest, - Services, ServicesListResponse, Upstream, + AgentEvent, BackendTriggerRequest, CertBundle, Empty, MsgId, NetMessage, NetType, + PortMappingBundle, ProxyRequest, Services, ServicesListResponse, Upstream, }; pub use proto::*; use tokio::sync::mpsc; @@ -140,4 +140,18 @@ impl NullnetGrpcInterface { .map_err(|e| e.to_string())? .into_inner()) } + + /// Subscribe to port-mapping changes: the returned stream yields the full + /// TCP/UDP port→service table immediately on subscribe and again whenever + /// it changes. + #[allow(clippy::missing_errors_doc)] + pub async fn watch_port_mappings(&self) -> Result, String> { + Ok(self + .client + .clone() + .watch_port_mappings(Request::new(Empty {})) + .await + .map_err(|e| e.to_string())? + .into_inner()) + } } diff --git a/members/nullnet-grpc-lib/src/proto/nullnet_grpc.rs b/members/nullnet-grpc-lib/src/proto/nullnet_grpc.rs index c6950ed..8a2d87d 100644 --- a/members/nullnet-grpc-lib/src/proto/nullnet_grpc.rs +++ b/members/nullnet-grpc-lib/src/proto/nullnet_grpc.rs @@ -167,6 +167,27 @@ pub struct Upstream { #[prost(uint32, tag = "2")] pub port: u32, } +/// One entry in the live port→service table. Only services with a non-HTTP +/// protocol need an entry — HTTP stays on the existing Host-header routing. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct PortMapping { + #[prost(string, tag = "1")] + pub service_name: ::prost::alloc::string::String, + #[prost(enumeration = "ServiceProtocol", tag = "2")] + pub protocol: i32, + #[prost(uint32, tag = "3")] + pub listen_port: u32, + /// Mirrors the service's configured per-client idle timeout (0 disables + /// it), so the proxy can expire UDP sessions without a second timeout + /// mechanism. + #[prost(uint64, tag = "4")] + pub idle_timeout_secs: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PortMappingBundle { + #[prost(message, repeated, tag = "1")] + pub mappings: ::prost::alloc::vec::Vec, +} #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct BackendTriggerRequest { #[prost(string, tag = "1")] @@ -212,7 +233,7 @@ pub struct Empty {} pub struct AgentEvent { #[prost( oneof = "agent_event::Event", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 24, 25, 13, 14, 15, 16, 17, 18, 19, 20, 21, 23, 22" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 24, 25, 13, 14, 15, 16, 17, 18, 19, 20, 21, 23, 26, 27, 28, 29, 22" )] pub event: ::core::option::Option, } @@ -271,6 +292,14 @@ pub mod agent_event { ProxyClientNotInet(super::AgentProxyClientNotInet), #[prost(message, tag = "23")] TlsCertificateInvalid(super::AgentTlsCertificateInvalid), + #[prost(message, tag = "26")] + TcpListenerBindFailed(super::AgentTcpListenerBindFailed), + #[prost(message, tag = "27")] + UdpListenerBindFailed(super::AgentUdpListenerBindFailed), + #[prost(message, tag = "28")] + TcpUpstreamConnectFailed(super::AgentTcpUpstreamConnectFailed), + #[prost(message, tag = "29")] + UdpUpstreamConnectFailed(super::AgentUdpUpstreamConnectFailed), /// Proxy info events #[prost(message, tag = "22")] ProxyRequestRouted(super::AgentProxyRequestRouted), @@ -447,6 +476,42 @@ pub struct AgentProxyRequestRouted { #[prost(uint64, tag = "4")] pub latency_ms: u64, } +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct AgentTcpListenerBindFailed { + #[prost(uint32, tag = "1")] + pub listen_port: u32, + #[prost(string, tag = "2")] + pub service_name: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub error_message: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct AgentUdpListenerBindFailed { + #[prost(uint32, tag = "1")] + pub listen_port: u32, + #[prost(string, tag = "2")] + pub service_name: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub error_message: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct AgentTcpUpstreamConnectFailed { + #[prost(string, tag = "1")] + pub service_name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub client_ip: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub error_message: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct AgentUdpUpstreamConnectFailed { + #[prost(string, tag = "1")] + pub service_name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub client_ip: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub error_message: ::prost::alloc::string::String, +} #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum Net { @@ -473,6 +538,38 @@ impl Net { } } } +/// Protocol a proxy-reachable service is exposed over. HTTP is routed by Host +/// header on the shared 80/443 listeners; TCP/UDP each get a dedicated +/// listen_port the proxy binds directly. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ServiceProtocol { + Http = 0, + Tcp = 1, + Udp = 2, +} +impl ServiceProtocol { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Http => "HTTP", + Self::Tcp => "TCP", + Self::Udp => "UDP", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "HTTP" => Some(Self::Http), + "TCP" => Some(Self::Tcp), + "UDP" => Some(Self::Udp), + _ => None, + } + } +} /// Generated client implementations. pub mod nullnet_grpc_client { #![allow( @@ -732,6 +829,35 @@ pub mod nullnet_grpc_client { ); self.inner.server_streaming(req, path, codec).await } + /// Long-lived stream: the server pushes the full TCP/UDP port→service mapping + /// table immediately on subscribe and again whenever services.toml changes, + /// so the proxy opens/closes raw TCP/UDP listeners without a restart. + pub async fn watch_port_mappings( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/nullnet_grpc.NullnetGrpc/WatchPortMappings", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("nullnet_grpc.NullnetGrpc", "WatchPortMappings"), + ); + self.inner.server_streaming(req, path, codec).await + } } } /// Generated server implementations. @@ -806,6 +932,22 @@ pub mod nullnet_grpc_server { tonic::Response, tonic::Status, >; + /// Server streaming response type for the WatchPortMappings method. + type WatchPortMappingsStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// Long-lived stream: the server pushes the full TCP/UDP port→service mapping + /// table immediately on subscribe and again whenever services.toml changes, + /// so the proxy opens/closes raw TCP/UDP listeners without a restart. + async fn watch_port_mappings( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } #[derive(Debug)] pub struct NullnetGrpcServer { @@ -1191,6 +1333,53 @@ pub mod nullnet_grpc_server { }; Box::pin(fut) } + "/nullnet_grpc.NullnetGrpc/WatchPortMappings" => { + #[allow(non_camel_case_types)] + struct WatchPortMappingsSvc(pub Arc); + impl< + T: NullnetGrpc, + > tonic::server::ServerStreamingService + for WatchPortMappingsSvc { + type Response = super::PortMappingBundle; + type ResponseStream = T::WatchPortMappingsStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::watch_port_mappings(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = WatchPortMappingsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { let mut response = http::Response::new( diff --git a/members/nullnet-proxy/Cargo.toml b/members/nullnet-proxy/Cargo.toml index 5cb2b06..6699268 100644 --- a/members/nullnet-proxy/Cargo.toml +++ b/members/nullnet-proxy/Cargo.toml @@ -15,7 +15,7 @@ openssl = "0.10" arc-swap = "1" nullnet-grpc-lib.workspace = true nullnet-liberror.workspace = true -tokio = { workspace = true, features = ["rt-multi-thread"] } +tokio = { workspace = true, features = ["rt-multi-thread", "net", "time", "sync", "io-util", "macros"] } ctrlc = { version = "3.5.2", features = ["termination"] } gag.workspace = true chrono.workspace = true diff --git a/members/nullnet-proxy/src/main.rs b/members/nullnet-proxy/src/main.rs index 165bbd0..125de47 100644 --- a/members/nullnet-proxy/src/main.rs +++ b/members/nullnet-proxy/src/main.rs @@ -1,6 +1,9 @@ mod env; mod nullnet_proxy; +mod port_mappings; +mod tcp_relay; mod tls; +mod udp_relay; use crate::nullnet_proxy::NullnetProxy; use crate::tls::{CertStore, TlsResolver}; @@ -255,6 +258,10 @@ async fn main() -> Result<(), nullnet_liberror::Error> { tokio::spawn(async move { watch_certificates(server, store).await }); } + // subscribe to the live TCP/UDP port→service table and keep raw listeners + // in sync with it for the lifetime of the process + tokio::spawn(port_mappings::watch_and_serve(nullnet_proxy.clone())); + // HTTP listener: redirects to HTTPS for hosts that have a cert let mut http_proxy = pingora_proxy::http_proxy_service(&my_server.configuration, nullnet_proxy.clone()); diff --git a/members/nullnet-proxy/src/port_mappings.rs b/members/nullnet-proxy/src/port_mappings.rs new file mode 100644 index 0000000..d1c81cf --- /dev/null +++ b/members/nullnet-proxy/src/port_mappings.rs @@ -0,0 +1,149 @@ +use crate::nullnet_proxy::NullnetProxy; +use crate::{tcp_relay, udp_relay}; +use nullnet_grpc_lib::nullnet_grpc::{PortMappingBundle, ServiceProtocol}; +use std::collections::HashMap; +use std::time::Duration; +use tokio::sync::watch; +use tokio::task::JoinHandle; + +#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)] +pub(crate) enum Protocol { + Tcp, + Udp, +} + +/// The service a `(protocol, listen_port)` pair routes to, plus the idle +/// timeout to apply to local sessions (UDP only — TCP relies on the OS to +/// signal connection close). Mirrors the server's per-service `timeout`; +/// `0` disables it, same as everywhere else in the config. +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct MappingEntry { + pub(crate) service_name: String, + pub(crate) idle_timeout_secs: u64, +} + +struct RunningListener { + handle: JoinHandle<()>, + mapping_tx: watch::Sender, +} + +/// Subscribe to the server's port-mapping stream and keep TCP/UDP listeners in +/// sync with it for as long as the proxy runs. Reconnects on stream drop — +/// the certs watcher already exits the process if the control service itself +/// goes down, so this loop only needs to ride out transient hiccups. +pub(crate) async fn watch_and_serve(proxy: NullnetProxy) { + loop { + match proxy.server.watch_port_mappings().await { + Ok(mut stream) => { + println!("[port-mappings] watch stream opened"); + let mut running: HashMap<(Protocol, u16), RunningListener> = HashMap::new(); + loop { + match stream.message().await { + Ok(Some(bundle)) => { + println!( + "[port-mappings] received bundle: {} mapping(s): [{}]", + bundle.mappings.len(), + bundle + .mappings + .iter() + .map(|m| format!( + "{:?}/{}→'{}'", + ServiceProtocol::try_from(m.protocol) + .unwrap_or(ServiceProtocol::Http), + m.listen_port, + m.service_name + )) + .collect::>() + .join(", ") + ); + reconcile(&proxy, &mut running, bundle); + } + Ok(None) => { + println!("[port-mappings] watch stream closed by server"); + break; + } + Err(e) => { + eprintln!("[port-mappings] watch stream error: {e}"); + break; + } + } + } + for (key, listener) in running { + listener.handle.abort(); + println!("[port-mappings] stopped {key:?} (watch stream closed)"); + } + } + Err(e) => eprintln!("[port-mappings] failed to open watch stream: {e}"), + } + tokio::time::sleep(Duration::from_secs(10)).await; + } +} + +/// Diff the freshly-pushed table against the listeners we currently run: +/// bind a new socket only for ports that just appeared, push updated +/// service_name/timeout to existing listeners without touching their socket, +/// and abort listeners for ports that disappeared. +fn reconcile( + proxy: &NullnetProxy, + running: &mut HashMap<(Protocol, u16), RunningListener>, + bundle: PortMappingBundle, +) { + let mut desired: HashMap<(Protocol, u16), MappingEntry> = HashMap::new(); + for m in bundle.mappings { + let Some(protocol) = local_protocol(m.protocol) else { + continue; + }; + let Ok(listen_port) = u16::try_from(m.listen_port) else { + eprintln!( + "[port-mappings] ignoring '{}': listen_port {} out of range", + m.service_name, m.listen_port + ); + continue; + }; + desired.insert( + (protocol, listen_port), + MappingEntry { + service_name: m.service_name, + idle_timeout_secs: m.idle_timeout_secs, + }, + ); + } + + let removed: Vec<(Protocol, u16)> = running + .keys() + .filter(|key| !desired.contains_key(key)) + .copied() + .collect(); + for key in removed { + if let Some(listener) = running.remove(&key) { + listener.handle.abort(); + println!("[port-mappings] stopped {key:?}"); + } + } + + for (key, entry) in desired { + if let Some(listener) = running.get(&key) { + let _ = listener.mapping_tx.send(entry); + } else { + println!( + "[port-mappings] starting {key:?} -> '{}'", + entry.service_name + ); + let (mapping_tx, mapping_rx) = watch::channel(entry); + let proxy = proxy.clone(); + let handle = match key.0 { + Protocol::Tcp => tokio::spawn(tcp_relay::run(proxy, key.1, mapping_rx)), + Protocol::Udp => tokio::spawn(udp_relay::run(proxy, key.1, mapping_rx)), + }; + running.insert(key, RunningListener { handle, mapping_tx }); + } + } +} + +fn local_protocol(raw: i32) -> Option { + match ServiceProtocol::try_from(raw).ok()? { + ServiceProtocol::Tcp => Some(Protocol::Tcp), + ServiceProtocol::Udp => Some(Protocol::Udp), + ServiceProtocol::Http => None, + } +} diff --git a/members/nullnet-proxy/src/tcp_relay.rs b/members/nullnet-proxy/src/tcp_relay.rs new file mode 100644 index 0000000..9e8a8b4 --- /dev/null +++ b/members/nullnet-proxy/src/tcp_relay.rs @@ -0,0 +1,138 @@ +use crate::nullnet_proxy::NullnetProxy; +use crate::port_mappings::MappingEntry; +use nullnet_grpc_lib::nullnet_grpc::{ + AgentEvent, AgentTcpListenerBindFailed, AgentTcpUpstreamConnectFailed, + AgentUpstreamLookupFailed, ProxyRequest, agent_event::Event as AgentEventKind, +}; +use tokio::io::copy_bidirectional; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::watch; + +/// Bind a raw TCP listener on `listen_port` and forward every accepted +/// connection to the service named in `mapping`, resolving the upstream the +/// same way the HTTP path does (`Proxy` RPC → on-demand VXLAN → local +/// upstream). Exits if the bind fails — the supervisor logs nothing further, +/// the failure event is the record of what happened. +pub(crate) async fn run( + proxy: NullnetProxy, + listen_port: u16, + mapping: watch::Receiver, +) { + let listener = match TcpListener::bind(("0.0.0.0", listen_port)).await { + Ok(l) => l, + Err(e) => { + let service_name = mapping.borrow().service_name.clone(); + eprintln!("[tcp/{listen_port}] bind failed: {e}"); + let _ = proxy + .server + .report_event(AgentEvent { + event: Some(AgentEventKind::TcpListenerBindFailed( + AgentTcpListenerBindFailed { + listen_port: u32::from(listen_port), + service_name, + error_message: e.to_string(), + }, + )), + }) + .await; + return; + } + }; + println!("[tcp/{listen_port}] listening"); + + loop { + match listener.accept().await { + Ok((stream, peer)) => { + let proxy = proxy.clone(); + let entry = mapping.borrow().clone(); + println!( + "[tcp/{listen_port}] accepted connection from {peer} → '{}'", + entry.service_name + ); + tokio::spawn(async move { + handle_connection(proxy, stream, peer.to_string(), entry).await; + }); + } + Err(e) => eprintln!("[tcp/{listen_port}] accept error: {e}"), + } + } +} + +async fn handle_connection( + proxy: NullnetProxy, + mut inbound: TcpStream, + client_addr: String, + entry: MappingEntry, +) { + let client_ip = client_addr + .parse::() + .map(|a| a.ip().to_string()) + .unwrap_or_else(|_| client_addr.clone()); + + let proxy_req = ProxyRequest { + client_ip: client_ip.clone(), + service_name: entry.service_name.clone(), + }; + let upstream = match proxy.get_or_add_upstream(proxy_req).await { + Ok(u) => { + println!( + "[tcp] {client_addr} → '{}' resolved upstream {u}", + entry.service_name + ); + u + } + Err(e) => { + eprintln!( + "[tcp] upstream lookup failed for '{}' (client {client_addr}): {e:?}", + entry.service_name + ); + let _ = proxy + .server + .report_event(AgentEvent { + event: Some(AgentEventKind::UpstreamLookupFailed( + AgentUpstreamLookupFailed { + service_name: entry.service_name, + client_ip, + error_message: format!("{e:?}"), + }, + )), + }) + .await; + return; + } + }; + + let mut outbound = match TcpStream::connect(upstream).await { + Ok(s) => { + println!("[tcp] {client_addr} ↔ {upstream} relay started"); + s + } + Err(e) => { + eprintln!("[tcp] dial upstream {upstream} failed (client {client_addr}): {e}"); + let _ = proxy + .server + .report_event(AgentEvent { + event: Some(AgentEventKind::TcpUpstreamConnectFailed( + AgentTcpUpstreamConnectFailed { + service_name: entry.service_name, + client_ip, + error_message: e.to_string(), + }, + )), + }) + .await; + return; + } + }; + + match copy_bidirectional(&mut inbound, &mut outbound).await { + Ok((to_upstream, to_client)) => { + println!( + "[tcp] {client_addr} ↔ {upstream} relay closed (↑{to_upstream}B ↓{to_client}B)" + ); + } + Err(e) => { + eprintln!("[tcp] {client_addr} ↔ {upstream} relay error: {e}"); + } + } +} diff --git a/members/nullnet-proxy/src/udp_relay.rs b/members/nullnet-proxy/src/udp_relay.rs new file mode 100644 index 0000000..3e6575f --- /dev/null +++ b/members/nullnet-proxy/src/udp_relay.rs @@ -0,0 +1,204 @@ +use crate::nullnet_proxy::NullnetProxy; +use crate::port_mappings::MappingEntry; +use nullnet_grpc_lib::nullnet_grpc::{ + AgentEvent, AgentUdpListenerBindFailed, AgentUdpUpstreamConnectFailed, + AgentUpstreamLookupFailed, ProxyRequest, agent_event::Event as AgentEventKind, +}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::net::UdpSocket; +use tokio::sync::watch; +use tokio::task::JoinHandle; + +const SWEEP_INTERVAL: Duration = Duration::from_secs(10); +const MAX_DATAGRAM: usize = 65536; + +/// A live client↔upstream relay, keyed by the client's `(ip, port)`. UDP has +/// no connection-close signal, so unlike TCP, sessions are only ever reaped +/// by the idle-timeout sweep below. +struct Session { + /// Connected to the upstream — `send`/`recv` address themselves. + upstream: Arc, + /// Pumps upstream replies back to the client via the shared listening + /// socket. Aborted when the session is swept. + relay_handle: JoinHandle<()>, + last_active: Instant, +} + +/// Bind a UDP listener on `listen_port` and relay datagrams to the service +/// named in `mapping`. The first datagram from a given client address +/// resolves an upstream via the same `Proxy` RPC the TCP/HTTP paths use; +/// subsequent datagrams from that address reuse the session until it goes +/// idle for `mapping.idle_timeout_secs` (0 disables the timeout, same as +/// everywhere else in the config). +pub(crate) async fn run( + proxy: NullnetProxy, + listen_port: u16, + mapping: watch::Receiver, +) { + let socket = match UdpSocket::bind(("0.0.0.0", listen_port)).await { + Ok(s) => s, + Err(e) => { + let service_name = mapping.borrow().service_name.clone(); + eprintln!("[udp/{listen_port}] bind failed: {e}"); + let _ = proxy + .server + .report_event(AgentEvent { + event: Some(AgentEventKind::UdpListenerBindFailed( + AgentUdpListenerBindFailed { + listen_port: u32::from(listen_port), + service_name, + error_message: e.to_string(), + }, + )), + }) + .await; + return; + } + }; + let socket = Arc::new(socket); + println!("[udp/{listen_port}] listening"); + + let mut sessions: HashMap = HashMap::new(); + let mut sweep = tokio::time::interval(SWEEP_INTERVAL); + let mut buf = vec![0u8; MAX_DATAGRAM]; + + loop { + tokio::select! { + recv = socket.recv_from(&mut buf) => { + match recv { + Ok((n, src)) => { + handle_datagram(&proxy, &socket, &mapping, &mut sessions, src, &buf[..n]).await; + } + Err(e) => eprintln!("[udp/{listen_port}] recv error: {e}"), + } + } + _ = sweep.tick() => { + sweep_idle(&mut sessions, mapping.borrow().idle_timeout_secs); + } + } + } +} + +async fn handle_datagram( + proxy: &NullnetProxy, + listen_socket: &Arc, + mapping: &watch::Receiver, + sessions: &mut HashMap, + src: SocketAddr, + data: &[u8], +) { + if let Some(session) = sessions.get_mut(&src) { + session.last_active = Instant::now(); + if let Err(e) = session.upstream.send(data).await { + eprintln!("[udp] send to upstream failed: {e}"); + } + return; + } + + let entry = mapping.borrow().clone(); + let proxy_req = ProxyRequest { + client_ip: src.ip().to_string(), + service_name: entry.service_name.clone(), + }; + let upstream_addr = match proxy.get_or_add_upstream(proxy_req).await { + Ok(u) => { + println!( + "[udp] new session from {src} → '{}' resolved upstream {u}", + entry.service_name + ); + u + } + Err(e) => { + eprintln!( + "[udp] upstream lookup failed for '{}' (client {src}): {e:?}", + entry.service_name + ); + let _ = proxy + .server + .report_event(AgentEvent { + event: Some(AgentEventKind::UpstreamLookupFailed( + AgentUpstreamLookupFailed { + service_name: entry.service_name, + client_ip: src.ip().to_string(), + error_message: format!("{e:?}"), + }, + )), + }) + .await; + return; + } + }; + + let upstream_socket = match UdpSocket::bind(("0.0.0.0", 0)).await { + Ok(s) => s, + Err(e) => { + eprintln!("[udp] failed to open upstream socket for {src}: {e}"); + return; + } + }; + if let Err(e) = upstream_socket.connect(upstream_addr).await { + eprintln!("[udp] connect to upstream {upstream_addr} failed (client {src}): {e}"); + let _ = proxy + .server + .report_event(AgentEvent { + event: Some(AgentEventKind::UdpUpstreamConnectFailed( + AgentUdpUpstreamConnectFailed { + service_name: entry.service_name, + client_ip: src.ip().to_string(), + error_message: e.to_string(), + }, + )), + }) + .await; + return; + } + let upstream_socket = Arc::new(upstream_socket); + + let relay_socket = listen_socket.clone(); + let relay_upstream = upstream_socket.clone(); + let relay_handle = tokio::spawn(async move { + let mut buf = vec![0u8; MAX_DATAGRAM]; + while let Ok(n) = relay_upstream.recv(&mut buf).await { + if relay_socket.send_to(&buf[..n], src).await.is_err() { + break; + } + } + }); + + if let Err(e) = upstream_socket.send(data).await { + eprintln!("[udp] initial send to upstream {upstream_addr} failed (client {src}): {e}"); + } + + sessions.insert( + src, + Session { + upstream: upstream_socket, + relay_handle, + last_active: Instant::now(), + }, + ); +} + +/// `idle_timeout_secs == 0` disables the timeout, same convention as the +/// server's per-service `timeout` everywhere else. +fn sweep_idle(sessions: &mut HashMap, idle_timeout_secs: u64) { + if idle_timeout_secs == 0 { + return; + } + let timeout = Duration::from_secs(idle_timeout_secs); + let now = Instant::now(); + let expired: Vec = sessions + .iter() + .filter(|(_, s)| now.duration_since(s.last_active) >= timeout) + .map(|(addr, _)| *addr) + .collect(); + for addr in expired { + if let Some(session) = sessions.remove(&addr) { + session.relay_handle.abort(); + println!("[udp] session from {addr} swept (idle > {idle_timeout_secs}s)"); + } + } +} diff --git a/members/nullnet-server/src/events.rs b/members/nullnet-server/src/events.rs index bb358fe..de12bd4 100644 --- a/members/nullnet-server/src/events.rs +++ b/members/nullnet-server/src/events.rs @@ -87,6 +87,15 @@ pub(crate) enum Event { stack: String, timestamp: u64, }, + PortMappingConflict { + stack_a: String, + service_a: String, + stack_b: String, + service_b: String, + protocol: String, + listen_port: u16, + timestamp: u64, + }, AllReplicasRemoved { service: String, stack: String, @@ -255,6 +264,30 @@ pub(crate) enum Event { reason: String, timestamp: u64, }, + TcpListenerBindFailed { + listen_port: u16, + service_name: String, + error_message: String, + timestamp: u64, + }, + UdpListenerBindFailed { + listen_port: u16, + service_name: String, + error_message: String, + timestamp: u64, + }, + TcpUpstreamConnectFailed { + service_name: String, + client_ip: String, + error_message: String, + timestamp: u64, + }, + UdpUpstreamConnectFailed { + service_name: String, + client_ip: String, + error_message: String, + timestamp: u64, + }, // --- Proxy info events --- ProxyRequestRouted { @@ -294,6 +327,7 @@ impl Event { Self::SessionTornDown { .. } => "session_torn_down", Self::ConfigReloaded { .. } => "config_reloaded", Self::ConfigStackRemoved { .. } => "config_stack_removed", + Self::PortMappingConflict { .. } => "port_mapping_conflict", Self::AllReplicasRemoved { .. } => "all_replicas_removed", Self::ServiceReachabilityToggled { .. } => "service_reachability_toggled", Self::ProxyClientTimedOut { .. } => "proxy_client_timed_out", @@ -326,6 +360,10 @@ impl Event { Self::UpstreamIpParseFailed { .. } => "upstream_ip_parse_failed", Self::ProxyClientNotInet { .. } => "proxy_client_not_inet", Self::TlsCertificateInvalid { .. } => "tls_certificate_invalid", + Self::TcpListenerBindFailed { .. } => "tcp_listener_bind_failed", + Self::UdpListenerBindFailed { .. } => "udp_listener_bind_failed", + Self::TcpUpstreamConnectFailed { .. } => "tcp_upstream_connect_failed", + Self::UdpUpstreamConnectFailed { .. } => "udp_upstream_connect_failed", Self::ProxyRequestRouted { .. } => "proxy_request_routed", Self::CertificateInstalled { .. } => "certificate_installed", Self::CertificateRenewed { .. } => "certificate_renewed", @@ -383,7 +421,12 @@ impl Event { | Self::ProxyRequestInvalidHost { .. } | Self::UpstreamIpParseFailed { .. } | Self::ProxyClientNotInet { .. } - | Self::TlsCertificateInvalid { .. } => Severity::Error, + | Self::TlsCertificateInvalid { .. } + | Self::PortMappingConflict { .. } + | Self::TcpListenerBindFailed { .. } + | Self::UdpListenerBindFailed { .. } + | Self::TcpUpstreamConnectFailed { .. } + | Self::UdpUpstreamConnectFailed { .. } => Severity::Error, } } @@ -475,6 +518,26 @@ impl Event { } } + #[allow(clippy::too_many_arguments)] + pub(crate) fn port_mapping_conflict( + stack_a: String, + service_a: String, + stack_b: String, + service_b: String, + protocol: String, + listen_port: u16, + ) -> Self { + Self::PortMappingConflict { + stack_a, + service_a, + stack_b, + service_b, + protocol, + listen_port, + timestamp: now_secs(), + } + } + pub(crate) fn all_replicas_removed(service: String, stack: String, ip: String) -> Self { Self::AllReplicasRemoved { service, @@ -765,6 +828,58 @@ impl Event { } } + pub(crate) fn tcp_listener_bind_failed( + listen_port: u16, + service_name: String, + error_message: String, + ) -> Self { + Self::TcpListenerBindFailed { + listen_port, + service_name, + error_message, + timestamp: now_secs(), + } + } + + pub(crate) fn udp_listener_bind_failed( + listen_port: u16, + service_name: String, + error_message: String, + ) -> Self { + Self::UdpListenerBindFailed { + listen_port, + service_name, + error_message, + timestamp: now_secs(), + } + } + + pub(crate) fn tcp_upstream_connect_failed( + service_name: String, + client_ip: String, + error_message: String, + ) -> Self { + Self::TcpUpstreamConnectFailed { + service_name, + client_ip, + error_message, + timestamp: now_secs(), + } + } + + pub(crate) fn udp_upstream_connect_failed( + service_name: String, + client_ip: String, + error_message: String, + ) -> Self { + Self::UdpUpstreamConnectFailed { + service_name, + client_ip, + error_message, + timestamp: now_secs(), + } + } + pub(crate) fn proxy_request_routed( service_name: String, client_ip: String, diff --git a/members/nullnet-server/src/nullnet_grpc_impl.rs b/members/nullnet-server/src/nullnet_grpc_impl.rs index 78e7692..935f4fd 100644 --- a/members/nullnet-server/src/nullnet_grpc_impl.rs +++ b/members/nullnet-server/src/nullnet_grpc_impl.rs @@ -12,8 +12,9 @@ use crate::services::service_info::{ServiceInfo, backend_involved_services}; use crate::timeout::check_timeouts; use nullnet_grpc_lib::nullnet_grpc::nullnet_grpc_server::NullnetGrpc; use nullnet_grpc_lib::nullnet_grpc::{ - AgentEvent, BackendTriggerRequest, CertBundle, Empty, MsgId, NetMessage, NetType, ProxyRequest, - ServiceTrigger, Services, ServicesListResponse, Upstream, agent_event::Event as AgentEventKind, + AgentEvent, BackendTriggerRequest, CertBundle, Empty, MsgId, NetMessage, NetType, PortMapping, + PortMappingBundle, ProxyRequest, ServiceTrigger, Services, ServicesListResponse, Upstream, + agent_event::Event as AgentEventKind, }; use nullnet_liberror::{Error, ErrorHandler, Location, location}; use std::collections::{HashMap, HashSet}; @@ -32,6 +33,37 @@ pub(crate) struct NullnetGrpcImpl { /// Latest TLS certificate set, kept in sync with `./certs` by a watcher. /// Proxies fetch the current value and subscribe for updates. certs: watch::Receiver, + /// Live TCP/UDP port→service table, derived from `services` and refreshed + /// on every services.toml change. Proxies subscribe for updates. + port_mappings: watch::Receiver, +} + +/// Build the live TCP/UDP port→service table from the current `StackMap`. +/// `Http` services are excluded — they stay on Host-header routing. +fn build_port_mapping_bundle(stacks: &StackMap) -> PortMappingBundle { + let mappings: Vec = stacks + .values() + .flat_map(HashMap::iter) + .filter_map(|(name, info)| { + let listen_port = u32::from(info.listen_port()?); + Some(PortMapping { + service_name: name.clone(), + protocol: info.protocol() as i32, + listen_port, + idle_timeout_secs: info.timeout().unwrap_or(0), + }) + }) + .collect(); + println!( + "[port-mappings] bundle built: {} mapping(s): [{}]", + mappings.len(), + mappings + .iter() + .map(|m| format!("{}/{}", m.listen_port, m.service_name)) + .collect::>() + .join(", ") + ); + PortMappingBundle { mappings } } /// Return the stack name that holds `service_name`, if any. Service names @@ -46,7 +78,7 @@ fn find_service_stack<'a>(services: &'a StackMap, service_name: &str) -> Option< impl NullnetGrpcImpl { pub async fn new() -> Result { - let services = Arc::new(RwLock::new(ServicesToml::load().await?)); + let services = Arc::new(RwLock::new(ServicesToml::load_validated().await?)); // regenerate the service graphviz periodically for debugging let services_2 = services.clone(); @@ -56,18 +88,43 @@ impl NullnetGrpcImpl { let orchestrator = Orchestrator::new(); let config_changed = Arc::new(Notify::new()); + // Separate from `config_changed`: `Notify::notify_one` wakes at most + // one waiter, so each consumer needs its own `Notify` rather than + // racing `check_timeouts` for the same wake-up. + let port_mappings_changed = Arc::new(Notify::new()); // keep services up to date with the services.toml file let services_2 = services.clone(); let orchestrator_2 = orchestrator.clone(); let config_changed_2 = config_changed.clone(); + let port_mappings_changed_2 = port_mappings_changed.clone(); tokio::spawn(async move { - if let Err(e) = ServicesToml::watch(&services_2, orchestrator_2, config_changed_2).await + if let Err(e) = ServicesToml::watch( + &services_2, + orchestrator_2, + config_changed_2, + port_mappings_changed_2, + ) + .await { eprintln!("failed to watch services.toml for changes: {e:?}"); } }); + // live TCP/UDP port→service table, refreshed whenever services.toml changes + let initial_mappings = build_port_mapping_bundle(&*services.read().await); + let (port_mappings_tx, port_mappings_rx) = watch::channel(initial_mappings); + let services_2 = services.clone(); + tokio::spawn(async move { + loop { + port_mappings_changed.notified().await; + let bundle = build_port_mapping_bundle(&*services_2.read().await); + if port_mappings_tx.send(bundle).is_err() { + break; + } + } + }); + // periodically check for timed-out proxy clients and tear down their chains let services_2 = services.clone(); let orchestrator_2 = orchestrator.clone(); @@ -87,6 +144,7 @@ impl NullnetGrpcImpl { services, orchestrator, certs: certs_rx, + port_mappings: port_mappings_rx, }) } @@ -1007,10 +1065,12 @@ struct SuccessfulEdge { impl NullnetGrpcImpl { pub(crate) fn new_for_test(services: StackMap) -> Self { let (_, certs) = watch::channel(CertBundle::default()); + let (_, port_mappings) = watch::channel(PortMappingBundle::default()); NullnetGrpcImpl { services: Arc::new(RwLock::new(services)), orchestrator: Orchestrator::new(), certs, + port_mappings, } } @@ -1100,6 +1160,30 @@ impl NullnetGrpc for NullnetGrpcImpl { Ok(Response::new(ReceiverStream::new(rx))) } + type WatchPortMappingsStream = ReceiverStream>; + + async fn watch_port_mappings( + &self, + _: Request, + ) -> Result, Status> { + let mut mappings = self.port_mappings.clone(); + let (tx, rx) = mpsc::channel(4); + tokio::spawn(async move { + // send the current table immediately, then one snapshot per change + let initial = mappings.borrow_and_update().clone(); + if tx.send(Ok(initial)).await.is_err() { + return; + } + while mappings.changed().await.is_ok() { + let snapshot = mappings.borrow_and_update().clone(); + if tx.send(Ok(snapshot)).await.is_err() { + break; + } + } + }); + Ok(Response::new(ReceiverStream::new(rx))) + } + async fn report_event(&self, req: Request) -> Result, Status> { let Some(kind) = req.into_inner().event else { return Ok(Response::new(Empty {})); @@ -1167,6 +1251,22 @@ impl NullnetGrpc for NullnetGrpcImpl { AgentEventKind::TlsCertificateInvalid(e) => { Event::tls_certificate_invalid(e.domain, e.reason) } + AgentEventKind::TcpListenerBindFailed(e) => Event::tcp_listener_bind_failed( + e.listen_port as u16, + e.service_name, + e.error_message, + ), + AgentEventKind::UdpListenerBindFailed(e) => Event::udp_listener_bind_failed( + e.listen_port as u16, + e.service_name, + e.error_message, + ), + AgentEventKind::TcpUpstreamConnectFailed(e) => { + Event::tcp_upstream_connect_failed(e.service_name, e.client_ip, e.error_message) + } + AgentEventKind::UdpUpstreamConnectFailed(e) => { + Event::udp_upstream_connect_failed(e.service_name, e.client_ip, e.error_message) + } AgentEventKind::ProxyRequestRouted(e) => Event::proxy_request_routed( e.service_name, e.client_ip, diff --git a/members/nullnet-server/src/services/changes.rs b/members/nullnet-server/src/services/changes.rs index bd79e86..3f3a1bd 100644 --- a/members/nullnet-server/src/services/changes.rs +++ b/members/nullnet-server/src/services/changes.rs @@ -226,9 +226,18 @@ async fn teardown_invalidated_service( let triggers = si.triggers().clone(); let timeout = si.timeout(); let max_nets = si.max_networks(); + let protocol = si.protocol(); + let listen_port = si.listen_port(); services.insert( invalidated_service.to_string(), - ServiceInfo::new(proxy_deps, triggers, timeout, max_nets), + ServiceInfo::new( + proxy_deps, + triggers, + timeout, + max_nets, + protocol, + listen_port, + ), ); } } diff --git a/members/nullnet-server/src/services/input.rs b/members/nullnet-server/src/services/input.rs index f5ee09e..e55bb40 100644 --- a/members/nullnet-server/src/services/input.rs +++ b/members/nullnet-server/src/services/input.rs @@ -3,6 +3,7 @@ use crate::orchestrator::Orchestrator; use crate::services::changes::{apply_changes, detect_config_changes}; use crate::services::service_info::ServiceInfo; use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use nullnet_grpc_lib::nullnet_grpc::ServiceProtocol; use nullnet_liberror::{Error, ErrorHandler, Location, location}; use serde::Deserialize; use std::collections::HashMap; @@ -58,10 +59,39 @@ impl ServicesToml { parse_file(Path::new(path)).await } + /// Load every stack and fail loudly if any `(protocol, listen_port)` pair + /// is claimed by more than one service — ports are global on the proxy, + /// unlike service names which only need to be unique within a stack. + pub(crate) async fn load_validated() -> Result { + let stacks = Self::load().await?; + let conflicts = detect_port_conflicts(&stacks); + if let Some(first) = conflicts.first() { + return Err(format!( + "port mapping conflict: {} other conflict(s); stack '{}' service '{}' and stack '{}' \ + service '{}' both claim {:?}/{}", + conflicts.len() - 1, + first.stack_a, + first.service_a, + first.stack_b, + first.service_b, + first.protocol, + first.listen_port + )) + .handle_err(location!()); + } + Ok(stacks) + } + + /// `config_changed` and `port_mappings_changed` are separate `Notify`s — + /// each has exactly one consumer (`check_timeouts` and the port-mapping + /// refresh task, respectively). `Notify::notify_one` wakes at most one + /// waiter, so sharing a single `Notify` across two consumers would have + /// them race for each wake-up instead of both reliably observing it. pub(crate) async fn watch( services: &Arc>, orchestrator: Orchestrator, config_changed: Arc, + port_mappings_changed: Arc, ) -> Result<(), Error> { let services_directory = PathBuf::from(SERVICES_DIR); @@ -95,10 +125,36 @@ impl ServicesToml { if last_update_time.elapsed().as_millis() > 100 { // ensure file changes are propagated tokio::time::sleep(Duration::from_millis(100)).await; - if let Ok(loaded_services) = ServicesToml::load().await { - let services_mut = &mut *services.write().await; - apply_config_update(services_mut, loaded_services, &orchestrator).await; - config_changed.notify_one(); + match ServicesToml::load().await { + Ok(loaded_services) => { + let conflicts = detect_port_conflicts(&loaded_services); + if conflicts.is_empty() { + let services_mut = &mut *services.write().await; + apply_config_update(services_mut, loaded_services, &orchestrator) + .await; + config_changed.notify_one(); + port_mappings_changed.notify_one(); + } else { + eprintln!( + "Rejecting services.toml reload: {} port mapping conflict(s)", + conflicts.len() + ); + for c in conflicts { + orchestrator + .events + .emit(ServerEvent::port_mapping_conflict( + c.stack_a, + c.service_a, + c.stack_b, + c.service_b, + format!("{:?}", c.protocol), + c.listen_port, + )) + .await; + } + } + } + Err(e) => eprintln!("Failed to reload services.toml: {e:?}"), } last_update_time = Instant::now(); } @@ -108,7 +164,7 @@ impl ServicesToml { Ok(()) } - pub(crate) fn services_map(self) -> HashMap { + pub(crate) fn services_map(self) -> Result, Error> { // Every name referenced in a proxy branch or trigger chain is registered // as a discoverable, non-entry-point placeholder (no deps, no timeout) so // hosts can register its replicas. The full chain lives on the declaring @@ -128,7 +184,14 @@ impl ServicesToml { for name in dep_names { ret_val.insert( name, - ServiceInfo::new(Vec::new(), HashMap::new(), None, None), + ServiceInfo::new( + Vec::new(), + HashMap::new(), + None, + None, + ServiceProtocol::Http, + None, + ), ); } @@ -137,23 +200,92 @@ impl ServicesToml { // proxy-reachable entry point, `None` (omitted) leaves it backend-only. // A declared service can thus host triggers without being reachable. for s in self.services { + let protocol = s + .protocol + .map_or(ServiceProtocol::Http, ServiceProtocol::from); + match (protocol, s.listen_port) { + (ServiceProtocol::Http, Some(_)) => { + return Err(format!( + "service '{}': 'listen_port' is only valid for protocol \"tcp\"/\"udp\", not \"http\"", + s.name + )) + .handle_err(location!()); + } + (ServiceProtocol::Tcp | ServiceProtocol::Udp, None) => { + return Err(format!( + "service '{}': protocol \"tcp\"/\"udp\" requires 'listen_port'", + s.name + )) + .handle_err(location!()); + } + _ => {} + } let triggers = s.triggers.into_iter().map(|t| (t.port, t.chain)).collect(); ret_val.insert( s.name, - ServiceInfo::new(s.proxy_dependencies, triggers, s.timeout, s.max_networks), + ServiceInfo::new( + s.proxy_dependencies, + triggers, + s.timeout, + s.max_networks, + protocol, + s.listen_port, + ), ); } - ret_val + Ok(ret_val) } } +/// Two services (possibly in different stacks) that both declared the same +/// `(protocol, listen_port)` pair. Ports are global on the proxy, so unlike +/// service names — which only need to be unique within a stack — these must +/// be unique across every stack. +pub(crate) struct PortConflict { + pub(crate) stack_a: String, + pub(crate) service_a: String, + pub(crate) stack_b: String, + pub(crate) service_b: String, + pub(crate) protocol: ServiceProtocol, + pub(crate) listen_port: u16, +} + +/// Scan every stack for `(protocol, listen_port)` pairs claimed by more than +/// one service. `Http` services are excluded — they have no `listen_port`. +pub(crate) fn detect_port_conflicts(stacks: &StackMap) -> Vec { + let mut claimed: HashMap<(ServiceProtocol, u16), (String, String)> = HashMap::new(); + let mut conflicts = Vec::new(); + for (stack, services) in stacks { + for (name, info) in services { + let Some(listen_port) = info.listen_port() else { + continue; + }; + let key = (info.protocol(), listen_port); + match claimed.get(&key) { + Some((other_stack, other_service)) => conflicts.push(PortConflict { + stack_a: other_stack.clone(), + service_a: other_service.clone(), + stack_b: stack.clone(), + service_b: name.clone(), + protocol: key.0, + listen_port, + }), + None => { + claimed.insert(key, (stack.clone(), name.clone())); + } + } + } + } + conflicts +} + async fn parse_file(path: &Path) -> Result, Error> { let str_repr = tokio::fs::read_to_string(path) .await .handle_err(location!())?; let parsed: ServicesToml = toml::from_str(&str_repr).handle_err(location!())?; - Ok(parsed.services_map()) + parsed.services_map() } pub(crate) async fn apply_config_update( @@ -226,6 +358,32 @@ struct ServiceToml { /// When the limit is reached, new proxy clients reuse an existing network /// on the same proxy node instead of creating a new one. max_networks: Option, + /// Protocol this service is reached over via the proxy. Omitted defaults + /// to `http` (Host-header routing on the shared 80/443 listeners). + /// `tcp`/`udp` each require `listen_port` — the proxy binds that port + /// directly and forwards raw traffic to this service. + protocol: Option, + /// External port the proxy listens on for this service. Required (and + /// only meaningful) when `protocol` is `tcp` or `udp`. + listen_port: Option, +} + +#[derive(Deserialize, Clone, Copy, Debug, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +enum ProtocolToml { + Http, + Tcp, + Udp, +} + +impl From for ServiceProtocol { + fn from(value: ProtocolToml) -> Self { + match value { + ProtocolToml::Http => ServiceProtocol::Http, + ProtocolToml::Tcp => ServiceProtocol::Tcp, + ProtocolToml::Udp => ServiceProtocol::Udp, + } + } } #[derive(Deserialize)] @@ -256,7 +414,7 @@ name = "fs.color.com" timeout = 30 "#; let parsed: ServicesToml = toml::from_str(toml_str).unwrap(); - let map = parsed.services_map(); + let map = parsed.services_map().unwrap(); // explicit entry points keep their configured timeout assert_eq!(map["color.com"].timeout(), Some(0)); @@ -284,7 +442,7 @@ port = 5555 chain = ["dep.b"] "#; let parsed: ServicesToml = toml::from_str(toml_str).unwrap(); - let map = parsed.services_map(); + let map = parsed.services_map().unwrap(); // Not proxy-reachable... assert_eq!(map["backend.only"].timeout(), None); @@ -302,7 +460,7 @@ timeout = 0 proxy_dependencies = [["a", "b"], ["c", "d"]] "#; let parsed: ServicesToml = toml::from_str(toml_str).unwrap(); - let map = parsed.services_map(); + let map = parsed.services_map().unwrap(); // The entry point keeps both branches verbatim. assert_eq!( @@ -366,4 +524,131 @@ timeout = 30 let _ = tokio::fs::remove_dir_all(&dir).await; } + + #[test] + fn tcp_udp_services_parse_with_listen_port() { + let toml_str = r#" +[[services]] +name = "redis.internal" +timeout = 0 +protocol = "tcp" +listen_port = 6379 + +[[services]] +name = "dns.internal" +timeout = 0 +protocol = "udp" +listen_port = 53 +"#; + let parsed: ServicesToml = toml::from_str(toml_str).unwrap(); + let map = parsed.services_map().unwrap(); + + assert_eq!(map["redis.internal"].protocol(), ServiceProtocol::Tcp); + assert_eq!(map["redis.internal"].listen_port(), Some(6379)); + assert_eq!(map["dns.internal"].protocol(), ServiceProtocol::Udp); + assert_eq!(map["dns.internal"].listen_port(), Some(53)); + } + + #[test] + fn http_service_defaults_with_no_listen_port() { + let toml_str = r#" +[[services]] +name = "color.com" +timeout = 0 +"#; + let parsed: ServicesToml = toml::from_str(toml_str).unwrap(); + let map = parsed.services_map().unwrap(); + + assert_eq!(map["color.com"].protocol(), ServiceProtocol::Http); + assert_eq!(map["color.com"].listen_port(), None); + } + + #[test] + fn tcp_protocol_without_listen_port_is_rejected() { + let toml_str = r#" +[[services]] +name = "redis.internal" +timeout = 0 +protocol = "tcp" +"#; + let parsed: ServicesToml = toml::from_str(toml_str).unwrap(); + assert!(parsed.services_map().is_err()); + } + + #[test] + fn http_protocol_with_listen_port_is_rejected() { + let toml_str = r#" +[[services]] +name = "color.com" +timeout = 0 +listen_port = 6379 +"#; + let parsed: ServicesToml = toml::from_str(toml_str).unwrap(); + assert!(parsed.services_map().is_err()); + } + + #[test] + fn detect_port_conflicts_flags_cross_stack_collision() { + let mut alpha = HashMap::new(); + alpha.insert( + "redis.a".to_string(), + ServiceInfo::new( + vec![], + HashMap::new(), + Some(0), + None, + ServiceProtocol::Tcp, + Some(6379), + ), + ); + let mut bravo = HashMap::new(); + bravo.insert( + "redis.b".to_string(), + ServiceInfo::new( + vec![], + HashMap::new(), + Some(0), + None, + ServiceProtocol::Tcp, + Some(6379), + ), + ); + let stacks: StackMap = + HashMap::from([("alpha".to_string(), alpha), ("bravo".to_string(), bravo)]); + + let conflicts = detect_port_conflicts(&stacks); + assert_eq!(conflicts.len(), 1); + assert_eq!(conflicts[0].listen_port, 6379); + assert_eq!(conflicts[0].protocol, ServiceProtocol::Tcp); + } + + #[test] + fn detect_port_conflicts_allows_same_port_different_protocol() { + let mut alpha = HashMap::new(); + alpha.insert( + "dns.tcp".to_string(), + ServiceInfo::new( + vec![], + HashMap::new(), + Some(0), + None, + ServiceProtocol::Tcp, + Some(53), + ), + ); + alpha.insert( + "dns.udp".to_string(), + ServiceInfo::new( + vec![], + HashMap::new(), + Some(0), + None, + ServiceProtocol::Udp, + Some(53), + ), + ); + let stacks: StackMap = HashMap::from([("alpha".to_string(), alpha)]); + + assert!(detect_port_conflicts(&stacks).is_empty()); + } } diff --git a/members/nullnet-server/src/services/service_info.rs b/members/nullnet-server/src/services/service_info.rs index cdc061a..015dde6 100644 --- a/members/nullnet-server/src/services/service_info.rs +++ b/members/nullnet-server/src/services/service_info.rs @@ -1,7 +1,7 @@ use crate::orchestrator::Orchestrator; use crate::services::clients::{Client, ClientInfo, Clients}; use crate::services::edge::Edge; -use nullnet_grpc_lib::nullnet_grpc::Upstream; +use nullnet_grpc_lib::nullnet_grpc::{ServiceProtocol, Upstream}; use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv4Addr}; use std::time::{Duration, Instant}; @@ -35,17 +35,22 @@ pub(crate) enum ServiceInfo { } impl ServiceInfo { + #[allow(clippy::too_many_arguments)] pub(crate) fn new( proxy_deps: Vec>, triggers: HashMap>, timeout: Option, max_networks: Option, + protocol: ServiceProtocol, + listen_port: Option, ) -> Self { ServiceInfo::Unregistered(UnregisteredServiceInfo::new( proxy_deps, triggers, timeout, max_networks, + protocol, + listen_port, )) } @@ -57,6 +62,8 @@ impl ServiceInfo { triggers: unreg.triggers.clone(), timeout: unreg.timeout, max_networks: unreg.max_networks, + protocol: unreg.protocol, + listen_port: unreg.listen_port, replicas: vec![Replica::new(ip, port, docker_container)], }); } @@ -95,6 +102,8 @@ impl ServiceInfo { reg.triggers.clone(), reg.timeout, reg.max_networks, + reg.protocol, + reg.listen_port, )); } } @@ -112,6 +121,8 @@ impl ServiceInfo { reg.triggers.clone(), reg.timeout, reg.max_networks, + reg.protocol, + reg.listen_port, )); } } @@ -127,18 +138,24 @@ impl ServiceInfo { pub(crate) fn update_from_file(&mut self, loaded: &Self) { let loaded_timeout = loaded.timeout(); let loaded_max_networks = loaded.max_networks(); + let loaded_protocol = loaded.protocol(); + let loaded_listen_port = loaded.listen_port(); match self { ServiceInfo::Unregistered(unreg) => { unreg.proxy_deps = loaded.proxy_deps().to_vec(); unreg.triggers.clone_from(loaded.triggers()); unreg.timeout = loaded_timeout; unreg.max_networks = loaded_max_networks; + unreg.protocol = loaded_protocol; + unreg.listen_port = loaded_listen_port; } ServiceInfo::Registered(reg) => { reg.proxy_deps = loaded.proxy_deps().to_vec(); reg.triggers.clone_from(loaded.triggers()); reg.timeout = loaded_timeout; reg.max_networks = loaded_max_networks; + reg.protocol = loaded_protocol; + reg.listen_port = loaded_listen_port; } } } @@ -150,6 +167,25 @@ impl ServiceInfo { } } + /// Protocol this service is reachable over via the proxy. `Http` is the + /// default — routed by Host header on the shared 80/443 listeners. `Tcp`/ + /// `Udp` services are reached on their own `listen_port`. + pub(crate) fn protocol(&self) -> ServiceProtocol { + match self { + ServiceInfo::Unregistered(unreg) => unreg.protocol, + ServiceInfo::Registered(reg) => reg.protocol, + } + } + + /// The external port the proxy binds to for `Tcp`/`Udp` services. + /// Always `None` for `Http` services (routed by Host header instead). + pub(crate) fn listen_port(&self) -> Option { + match self { + ServiceInfo::Unregistered(unreg) => unreg.listen_port, + ServiceInfo::Registered(reg) => reg.listen_port, + } + } + pub(crate) fn proxy_deps(&self) -> &[Vec] { match self { ServiceInfo::Unregistered(unreg) => &unreg.proxy_deps, @@ -183,20 +219,29 @@ pub(crate) struct UnregisteredServiceInfo { timeout: Option, /// Maximum number of networks for this service. max_networks: Option, + /// Protocol this service is reachable over via the proxy (default `Http`). + protocol: ServiceProtocol, + /// External port the proxy binds to for `Tcp`/`Udp` services. + listen_port: Option, } impl UnregisteredServiceInfo { + #[allow(clippy::too_many_arguments)] fn new( proxy_deps: Vec>, triggers: HashMap>, timeout: Option, max_networks: Option, + protocol: ServiceProtocol, + listen_port: Option, ) -> Self { Self { proxy_deps, triggers, timeout, max_networks, + protocol, + listen_port, } } } @@ -280,6 +325,10 @@ pub(crate) struct RegisteredServiceInfo { timeout: Option, /// Maximum number of networks for this service. max_networks: Option, + /// Protocol this service is reachable over via the proxy (default `Http`). + protocol: ServiceProtocol, + /// External port the proxy binds to for `Tcp`/`Udp` services. + listen_port: Option, /// Replicas of this service. replicas: Vec, } diff --git a/members/nullnet-server/src/tests.rs b/members/nullnet-server/src/tests.rs index 528a635..d52d11a 100644 --- a/members/nullnet-server/src/tests.rs +++ b/members/nullnet-server/src/tests.rs @@ -5,7 +5,7 @@ use crate::nullnet_grpc_impl::NullnetGrpcImpl; use crate::services::input::{ServicesToml, StackMap, apply_config_update}; use crate::services::service_info::ServiceInfo; use crate::timeout::apply_timeouts; -use nullnet_grpc_lib::nullnet_grpc::{NetMessage, net_message}; +use nullnet_grpc_lib::nullnet_grpc::{NetMessage, ServiceProtocol, net_message}; use std::collections::{HashMap, HashSet}; use std::net::{IpAddr, Ipv4Addr}; @@ -2760,11 +2760,25 @@ fn suspend_test_server() -> NullnetGrpcImpl { // both are proxy entry points (timeout = Some); one Docker-backed, one host inner.insert( "svc".to_string(), - ServiceInfo::new(vec![], HashMap::new(), Some(30), None), + ServiceInfo::new( + vec![], + HashMap::new(), + Some(30), + None, + ServiceProtocol::Http, + None, + ), ); inner.insert( "host_svc".to_string(), - ServiceInfo::new(vec![], HashMap::new(), Some(30), None), + ServiceInfo::new( + vec![], + HashMap::new(), + Some(30), + None, + ServiceProtocol::Http, + None, + ), ); NullnetGrpcImpl::new_for_test(into_stack_map(inner)) } @@ -2832,17 +2846,33 @@ async fn backend_involved_replicas_never_suspended() { HashMap::from([(8080u16, vec!["dep".to_string()])]), Some(30), None, + ServiceProtocol::Http, + None, ), ); // dep is named in the trigger chain (so it "is a backend dep") inner.insert( "dep".to_string(), - ServiceInfo::new(vec![], HashMap::new(), None, None), + ServiceInfo::new( + vec![], + HashMap::new(), + None, + None, + ServiceProtocol::Http, + None, + ), ); // a plain entry-point service with no backend involvement (control) inner.insert( "plain".to_string(), - ServiceInfo::new(vec![], HashMap::new(), Some(30), None), + ServiceInfo::new( + vec![], + HashMap::new(), + Some(30), + None, + ServiceProtocol::Http, + None, + ), ); let server = NullnetGrpcImpl::new_for_test(into_stack_map(inner));