diff --git a/app/src/commands/service.rs b/app/src/commands/service.rs index 0dc35275..49d1c1f4 100644 --- a/app/src/commands/service.rs +++ b/app/src/commands/service.rs @@ -35,13 +35,17 @@ pub struct Start { help = "Worker thread number the tokio `Runtime` will use" )] pub threads: Option, - /// Maximum concurrent enclave ECALLs across serial and speculative paths. - /// Set this to match the loaded enclave's `TCSNum`; the default assumes a - /// conservative TCS budget of 4. + /// Size of the dedicated ECALL worker pool that owns the set of OS + /// threads allowed to enter the enclave. Under `TCSPolicy=BIND` the + /// Intel SGX SDK pins one TCS to each ECALL-issuing thread for the + /// thread's lifetime, so this value also bounds the cumulative number + /// of TCS bindings created by the service. Set this to a value strictly + /// less than the enclave's `TCSNum` to leave headroom for the SDK + /// runtime and any speculative path that spawns ad-hoc workers. #[clap( long = "max-enclave-concurrency", default_value_t = 4, - help = "Maximum concurrent enclave ECALLs" + help = "Size of the dedicated ECALL worker pool" )] pub max_enclave_concurrency: usize, /// Maximum concurrent speculative update-client requests. @@ -89,7 +93,12 @@ impl ServiceCmd { enclave_parallelism ); } - let srv = ElcService::new(opts.get_home(), enclave, speculative_concurrency_limit); + let srv = ElcService::new( + opts.get_home(), + enclave, + speculative_concurrency_limit, + enclave_parallelism, + ); info!( "start service: addr={addr} mrenclave={mrenclave} speculative_concurrency_limit={} enclave_parallelism={}", diff --git a/modules/service/src/ecall_pool.rs b/modules/service/src/ecall_pool.rs new file mode 100644 index 00000000..497bb96a --- /dev/null +++ b/modules/service/src/ecall_pool.rs @@ -0,0 +1,171 @@ +use log::*; +use std::sync::mpsc::{channel, Sender}; +use std::sync::{Arc, Mutex}; +use std::thread::{self, JoinHandle}; + +type Job = Box; + +/// A fixed-size pool of long-lived OS threads dedicated to executing ECALLs. +/// +/// Under `TCSPolicy=BIND`, the Intel SGX SDK binds a TCS to each host thread +/// on its first ECALL and only releases the binding when the thread +/// terminates. Without an upper bound on the set of distinct threads that +/// ever ECALL, cumulative bindings can exceed `TCSNum` even when concurrent +/// ECALLs stay well below it, producing transient `SGX_ERROR_OUT_OF_TCS` +/// failures. See datachainlab/docs#10123. +/// +/// `EcallPool` solves this by pinning ECALL execution to exactly `size` +/// permanent worker threads. Workers are spawned once at service start and +/// live for the entire process lifetime; their TCS bindings are therefore +/// stable at `size` and never accumulate. +pub struct EcallPool { + sender: Option>, + workers: Vec>, +} + +impl EcallPool { + /// Creates a pool with `size` permanent worker threads (`size.max(1)`). + /// Callers should set `size` equal to `--max-enclave-concurrency`. + pub fn new(size: usize) -> Self { + let size = size.max(1); + let (sender, receiver) = channel::(); + let receiver = Arc::new(Mutex::new(receiver)); + let workers = (0..size) + .map(|i| { + let receiver = Arc::clone(&receiver); + thread::Builder::new() + .name(format!("ecall-{}", i)) + .spawn(move || ecall_worker_loop(i, receiver)) + .expect("failed to spawn ECALL pool worker") + }) + .collect(); + Self { + sender: Some(sender), + workers, + } + } + + /// Runs `f` on one of the pool's worker threads, blocking the caller + /// until the job completes. Each invocation acquires a worker slot. + pub fn run(&self, f: F) -> R + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let sender = self + .sender + .as_ref() + .expect("ECALL pool used after shutdown"); + let (tx, rx) = channel(); + let job: Job = Box::new(move || { + let _ = tx.send(f()); + }); + sender + .send(job) + .expect("ECALL pool worker channel closed"); + rx.recv() + .expect("ECALL pool worker terminated before producing a result") + } +} + +impl Drop for EcallPool { + fn drop(&mut self) { + // Closing the sender lets each worker observe `Err` on `recv` and + // exit its loop. We then join every worker so SGX SDK destructors + // run before the surrounding process resources are torn down. + drop(self.sender.take()); + for worker in self.workers.drain(..) { + if let Err(e) = worker.join() { + warn!("ECALL pool worker panicked at shutdown: {:?}", e); + } + } + } +} + +fn ecall_worker_loop(index: usize, receiver: Arc>>) { + debug!("ECALL worker {} started", index); + loop { + let job = { + let recv = receiver.lock().unwrap(); + recv.recv() + }; + match job { + Ok(job) => job(), + Err(_) => { + debug!("ECALL worker {} exiting (channel closed)", index); + return; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::EcallPool; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use std::thread; + use std::time::Duration; + + #[test] + fn pool_limits_concurrent_jobs_to_worker_count() { + let pool = Arc::new(EcallPool::new(2)); + let in_flight = Arc::new(AtomicUsize::new(0)); + let observed_max = Arc::new(AtomicUsize::new(0)); + let mut handles = Vec::new(); + for _ in 0..6 { + let pool = Arc::clone(&pool); + let in_flight = Arc::clone(&in_flight); + let observed_max = Arc::clone(&observed_max); + handles.push(thread::spawn(move || { + pool.run(move || { + let current = in_flight.fetch_add(1, Ordering::SeqCst) + 1; + observed_max.fetch_max(current, Ordering::SeqCst); + thread::sleep(Duration::from_millis(40)); + in_flight.fetch_sub(1, Ordering::SeqCst); + }); + })); + } + for handle in handles { + handle.join().unwrap(); + } + assert_eq!(observed_max.load(Ordering::SeqCst), 2); + } + + #[test] + fn pool_returns_job_result_to_caller() { + let pool = EcallPool::new(2); + let result = pool.run(|| 7 * 6); + assert_eq!(result, 42); + } + + #[test] + fn pool_workers_have_stable_thread_ids_across_jobs() { + // Verifies the "1 thread = 1 TCS forever" property under BIND policy: + // the set of OS thread ids that execute jobs is bounded by pool size. + let pool = Arc::new(EcallPool::new(3)); + let observed = Arc::new(std::sync::Mutex::new( + std::collections::HashSet::::new(), + )); + let mut handles = Vec::new(); + for _ in 0..30 { + let pool = Arc::clone(&pool); + let observed = Arc::clone(&observed); + handles.push(thread::spawn(move || { + pool.run(move || { + observed.lock().unwrap().insert(thread::current().id()); + thread::sleep(Duration::from_millis(5)); + }); + })); + } + for handle in handles { + handle.join().unwrap(); + } + let set = observed.lock().unwrap(); + assert!( + set.len() <= 3, + "expected at most pool-size distinct worker threads, saw {}", + set.len() + ); + } +} diff --git a/modules/service/src/elc.rs b/modules/service/src/elc.rs index d983d84e..0d98d989 100644 --- a/modules/service/src/elc.rs +++ b/modules/service/src/elc.rs @@ -34,7 +34,15 @@ where &self, request: Request, ) -> Result, Status> { - match self.app.enclave.proto_create_client(request.into_inner()) { + let inner = request.into_inner(); + let app = self.app.clone(); + let result = tokio::task::spawn_blocking(move || { + app.ecall_pool + .run(move || app.enclave.proto_create_client(inner)) + }) + .await + .map_err(|e| Status::aborted(format!("create client worker failed: {e}")))?; + match result { Ok(res) => Ok(Response::new(res)), Err(e) => Err(Status::aborted(e.to_string())), } @@ -48,8 +56,13 @@ where let client_id = msg.client_id.clone(); let service = self.clone(); let result = tokio::task::spawn_blocking(move || { - service.with_client_update_serialized(&client_id, || { - service.app.enclave.proto_update_client(msg) + let pool = service.app.ecall_pool.clone(); + let enclave = service.app.enclave.clone(); + service.with_client_update_serialized(&client_id, move || { + // The blocking-pool thread holds the per-client lock; the + // actual ECALL runs on an EcallPool worker so cumulative + // TCS bindings stay bounded. + pool.run(move || enclave.proto_update_client(msg)) }) }) .await @@ -120,8 +133,10 @@ where let client_id = msg.client_id.clone(); let service = self.clone(); let result = tokio::task::spawn_blocking(move || { - service.with_client_update_serialized(&client_id, || { - service.app.enclave.proto_update_client(msg) + let pool = service.app.ecall_pool.clone(); + let enclave = service.app.enclave.clone(); + service.with_client_update_serialized(&client_id, move || { + pool.run(move || enclave.proto_update_client(msg)) }) }) .await @@ -231,11 +246,15 @@ where &self, request: Request, ) -> Result, Status> { - match self - .app - .enclave - .proto_aggregate_messages(request.into_inner()) - { + let inner = request.into_inner(); + let app = self.app.clone(); + let result = tokio::task::spawn_blocking(move || { + app.ecall_pool + .run(move || app.enclave.proto_aggregate_messages(inner)) + }) + .await + .map_err(|e| Status::aborted(format!("aggregate messages worker failed: {e}")))?; + match result { Ok(res) => Ok(Response::new(res)), Err(e) => Err(Status::aborted(e.to_string())), } @@ -245,11 +264,15 @@ where &self, request: Request, ) -> Result, Status> { - match self - .app - .enclave - .proto_verify_membership(request.into_inner()) - { + let inner = request.into_inner(); + let app = self.app.clone(); + let result = tokio::task::spawn_blocking(move || { + app.ecall_pool + .run(move || app.enclave.proto_verify_membership(inner)) + }) + .await + .map_err(|e| Status::aborted(format!("verify membership worker failed: {e}")))?; + match result { Ok(res) => Ok(Response::new(res)), Err(e) => Err(Status::aborted(e.to_string())), } @@ -259,11 +282,15 @@ where &self, request: Request, ) -> Result, Status> { - match self - .app - .enclave - .proto_verify_non_membership(request.into_inner()) - { + let inner = request.into_inner(); + let app = self.app.clone(); + let result = tokio::task::spawn_blocking(move || { + app.ecall_pool + .run(move || app.enclave.proto_verify_non_membership(inner)) + }) + .await + .map_err(|e| Status::aborted(format!("verify non-membership worker failed: {e}")))?; + match result { Ok(res) => Ok(Response::new(res)), Err(e) => Err(Status::aborted(e.to_string())), } @@ -273,14 +300,22 @@ where #[tonic::async_trait] impl Query for AppService where - S: CommitStore + TxAccessor + 'static, - E: EnclaveProtoAPI + 'static, + S: CommitStore + TxAccessor + Send + 'static, + E: EnclaveProtoAPI + Send + Sync + 'static, { async fn client( &self, request: Request, ) -> Result, Status> { - match self.enclave.proto_query_client(request.into_inner()) { + let inner = request.into_inner(); + let app = self.clone(); + let result = tokio::task::spawn_blocking(move || { + app.ecall_pool + .run(move || app.enclave.proto_query_client(inner)) + }) + .await + .map_err(|e| Status::aborted(format!("query client worker failed: {e}")))?; + match result { Ok(res) => Ok(Response::new(res)), Err(e) => Err(Status::aborted(e.to_string())), } diff --git a/modules/service/src/lib.rs b/modules/service/src/lib.rs index 6dec822e..247de9bf 100644 --- a/modules/service/src/lib.rs +++ b/modules/service/src/lib.rs @@ -1,4 +1,5 @@ mod client_lock; +mod ecall_pool; mod elc; mod enclave; mod service; diff --git a/modules/service/src/service.rs b/modules/service/src/service.rs index 2540a022..0f3e2d48 100644 --- a/modules/service/src/service.rs +++ b/modules/service/src/service.rs @@ -1,4 +1,5 @@ use crate::client_lock::ClientUpdateLocks; +use crate::ecall_pool::EcallPool; use crate::speculative::SpeculativeService; use anyhow::Result; use enclave_api::{EnclaveProtoAPI, SpeculativeEnclaveCommandAPI}; @@ -19,6 +20,12 @@ where { pub(crate) home: PathBuf, pub(crate) enclave: Arc, + /// Long-lived pool that owns the set of OS threads allowed to ECALL. + /// All ECALL-issuing call sites in the gRPC layer dispatch through this + /// pool to keep the cumulative set of distinct host threads that ever + /// enter the enclave bounded by `--max-enclave-concurrency`. See + /// datachainlab/docs#10123 for the TCSPolicy=BIND rationale. + pub(crate) ecall_pool: Arc, _marker: PhantomData, } @@ -41,6 +48,7 @@ where Self { home: self.home.clone(), enclave: self.enclave.clone(), + ecall_pool: self.ecall_pool.clone(), _marker: Default::default(), } } @@ -65,10 +73,11 @@ where S: CommitStore + 'static, E: EnclaveProtoAPI + 'static, { - pub fn new>(home: P, enclave: E) -> Self { + pub fn new>(home: P, enclave: E, ecall_concurrency: usize) -> Self { AppService { home: home.into(), enclave: Arc::new(enclave), + ecall_pool: Arc::new(EcallPool::new(ecall_concurrency)), _marker: Default::default(), } } @@ -83,8 +92,9 @@ where home: P, enclave: E, speculative_concurrency_limit: usize, + ecall_concurrency: usize, ) -> Self { - let app = AppService::new(home, enclave); + let app = AppService::new(home, enclave, ecall_concurrency); let speculative = SpeculativeService::new(speculative_concurrency_limit); Self { app, diff --git a/modules/service/src/speculative/service.rs b/modules/service/src/speculative/service.rs index ce48a5f9..fa06e910 100644 --- a/modules/service/src/speculative/service.rs +++ b/modules/service/src/speculative/service.rs @@ -662,7 +662,7 @@ mod tests { fn stitch_rejects_first_base_state_that_is_not_in_store() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(1)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(1); let req = with_explicit_base_state_payload(mk_req( "unit-0000", @@ -718,7 +718,7 @@ mod tests { fn stitch_accepts_first_base_state_when_stored_consensus_and_state_id_match() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(1)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(1); let mut req = with_explicit_base_state_payload(mk_req( "unit-0000", @@ -761,7 +761,7 @@ mod tests { fn stitch_rejects_first_base_state_when_prev_state_id_is_missing() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(1)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(1); let req = with_explicit_base_state_payload(mk_req( "unit-0000", @@ -811,7 +811,7 @@ mod tests { fn stitch_rejects_first_base_state_when_stored_state_id_is_missing() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(1)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(1); let mut req = with_explicit_base_state_payload(mk_req( "unit-0000", @@ -869,7 +869,7 @@ mod tests { fn stitch_rejects_first_base_state_when_state_id_mismatch() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(1)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(1); let mut req = with_explicit_base_state_payload(mk_req( "unit-0000", @@ -927,7 +927,7 @@ mod tests { fn stitch_rejects_first_base_state_when_client_state_does_not_match_state_id() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(1)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(1); let mut req = with_explicit_base_state_payload(mk_req( "unit-0000", @@ -994,7 +994,7 @@ mod tests { fn stitch_rejects_first_base_state_when_canonical_client_state_advanced() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(1)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(1); let mut req = with_explicit_base_state_payload(mk_req( "unit-0000", @@ -1054,7 +1054,7 @@ mod tests { fn streaming_speculative_batch_executes_before_input_closes() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(100)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(1); let (tx, rx) = std::sync::mpsc::sync_channel(2); let worker_service = service.clone(); @@ -1151,7 +1151,7 @@ mod tests { fn streaming_speculative_batch_rejects_channel_close_without_complete() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(1)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(1); let (tx, rx) = std::sync::mpsc::sync_channel(1); let worker_service = service.clone(); @@ -1202,7 +1202,7 @@ mod tests { fn streaming_speculative_batch_execution_does_not_apply_until_stitched() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(1)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(1); let (tx, rx) = std::sync::mpsc::sync_channel(2); let worker_service = service.clone(); @@ -1258,7 +1258,7 @@ mod tests { fn streaming_speculative_batch_rejects_incomplete_base_state() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(1)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(2); let (tx, rx) = std::sync::mpsc::sync_channel(2); let worker_service = service.clone(); @@ -1312,7 +1312,7 @@ mod tests { fn streaming_speculative_batch_parallelizes_complete_base_state_units() { let client_id = "07-tendermint-0"; let enclave = FakeEnclave::new(Duration::from_millis(100)); - let app = AppService::::new("test-home", enclave); + let app = AppService::::new("test-home", enclave, 1); let service = SpeculativeService::new(3); let (tx, rx) = std::sync::mpsc::sync_channel(3); let worker_service = service.clone();