Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions app/src/commands/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ pub struct Start {
help = "Worker thread number the tokio `Runtime` will use"
)]
pub threads: Option<usize>,
/// Maximum concurrent enclave ECALLs across serial and speculative paths.
/// Set this to match the loaded enclave's `TCSNum`; the default assumes a
/// conservative TCS budget of 4.
/// Size of the dedicated ECALL worker pool that owns the set of OS
/// threads allowed to enter the enclave. Under `TCSPolicy=BIND` the
/// Intel SGX SDK pins one TCS to each ECALL-issuing thread for the
/// thread's lifetime, so this value also bounds the cumulative number
/// of TCS bindings created by the service. Set this to a value strictly
/// less than the enclave's `TCSNum` to leave headroom for the SDK
/// runtime and any speculative path that spawns ad-hoc workers.
#[clap(
long = "max-enclave-concurrency",
default_value_t = 4,
help = "Maximum concurrent enclave ECALLs"
help = "Size of the dedicated ECALL worker pool"
)]
pub max_enclave_concurrency: usize,
/// Maximum concurrent speculative update-client requests.
Expand Down Expand Up @@ -89,7 +93,12 @@ impl ServiceCmd {
enclave_parallelism
);
}
let srv = ElcService::new(opts.get_home(), enclave, speculative_concurrency_limit);
let srv = ElcService::new(
opts.get_home(),
enclave,
speculative_concurrency_limit,
enclave_parallelism,
);

info!(
"start service: addr={addr} mrenclave={mrenclave} speculative_concurrency_limit={} enclave_parallelism={}",
Expand Down
171 changes: 171 additions & 0 deletions modules/service/src/ecall_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use log::*;
use std::sync::mpsc::{channel, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

type Job = Box<dyn FnOnce() + Send + 'static>;

/// A fixed-size pool of long-lived OS threads dedicated to executing ECALLs.
///
/// Under `TCSPolicy=BIND`, the Intel SGX SDK binds a TCS to each host thread
/// on its first ECALL and only releases the binding when the thread
/// terminates. Without an upper bound on the set of distinct threads that
/// ever ECALL, cumulative bindings can exceed `TCSNum` even when concurrent
/// ECALLs stay well below it, producing transient `SGX_ERROR_OUT_OF_TCS`
/// failures. See datachainlab/docs#10123.
///
/// `EcallPool` solves this by pinning ECALL execution to exactly `size`
/// permanent worker threads. Workers are spawned once at service start and
/// live for the entire process lifetime; their TCS bindings are therefore
/// stable at `size` and never accumulate.
pub struct EcallPool {
sender: Option<Sender<Job>>,
workers: Vec<JoinHandle<()>>,
}

impl EcallPool {
/// Creates a pool with `size` permanent worker threads (`size.max(1)`).
/// Callers should set `size` equal to `--max-enclave-concurrency`.
pub fn new(size: usize) -> Self {
let size = size.max(1);
let (sender, receiver) = channel::<Job>();
let receiver = Arc::new(Mutex::new(receiver));
let workers = (0..size)
.map(|i| {
let receiver = Arc::clone(&receiver);
thread::Builder::new()
.name(format!("ecall-{}", i))
.spawn(move || ecall_worker_loop(i, receiver))
.expect("failed to spawn ECALL pool worker")
})
.collect();
Self {
sender: Some(sender),
workers,
}
}

/// Runs `f` on one of the pool's worker threads, blocking the caller
/// until the job completes. Each invocation acquires a worker slot.
pub fn run<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let sender = self
.sender
.as_ref()
.expect("ECALL pool used after shutdown");
let (tx, rx) = channel();
let job: Job = Box::new(move || {
let _ = tx.send(f());
});
sender
.send(job)
.expect("ECALL pool worker channel closed");
rx.recv()
.expect("ECALL pool worker terminated before producing a result")
}
}

impl Drop for EcallPool {
fn drop(&mut self) {
// Closing the sender lets each worker observe `Err` on `recv` and
// exit its loop. We then join every worker so SGX SDK destructors
// run before the surrounding process resources are torn down.
drop(self.sender.take());
for worker in self.workers.drain(..) {
if let Err(e) = worker.join() {
warn!("ECALL pool worker panicked at shutdown: {:?}", e);
}
}
}
}

fn ecall_worker_loop(index: usize, receiver: Arc<Mutex<std::sync::mpsc::Receiver<Job>>>) {
debug!("ECALL worker {} started", index);
loop {
let job = {
let recv = receiver.lock().unwrap();
recv.recv()
};
match job {
Ok(job) => job(),
Err(_) => {
debug!("ECALL worker {} exiting (channel closed)", index);
return;
}
}
}
}

#[cfg(test)]
mod tests {
use super::EcallPool;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

#[test]
fn pool_limits_concurrent_jobs_to_worker_count() {
let pool = Arc::new(EcallPool::new(2));
let in_flight = Arc::new(AtomicUsize::new(0));
let observed_max = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..6 {
let pool = Arc::clone(&pool);
let in_flight = Arc::clone(&in_flight);
let observed_max = Arc::clone(&observed_max);
handles.push(thread::spawn(move || {
pool.run(move || {
let current = in_flight.fetch_add(1, Ordering::SeqCst) + 1;
observed_max.fetch_max(current, Ordering::SeqCst);
thread::sleep(Duration::from_millis(40));
in_flight.fetch_sub(1, Ordering::SeqCst);
});
}));
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(observed_max.load(Ordering::SeqCst), 2);
}

#[test]
fn pool_returns_job_result_to_caller() {
let pool = EcallPool::new(2);
let result = pool.run(|| 7 * 6);
assert_eq!(result, 42);
}

#[test]
fn pool_workers_have_stable_thread_ids_across_jobs() {
// Verifies the "1 thread = 1 TCS forever" property under BIND policy:
// the set of OS thread ids that execute jobs is bounded by pool size.
let pool = Arc::new(EcallPool::new(3));
let observed = Arc::new(std::sync::Mutex::new(
std::collections::HashSet::<thread::ThreadId>::new(),
));
let mut handles = Vec::new();
for _ in 0..30 {
let pool = Arc::clone(&pool);
let observed = Arc::clone(&observed);
handles.push(thread::spawn(move || {
pool.run(move || {
observed.lock().unwrap().insert(thread::current().id());
thread::sleep(Duration::from_millis(5));
});
}));
}
for handle in handles {
handle.join().unwrap();
}
let set = observed.lock().unwrap();
assert!(
set.len() <= 3,
"expected at most pool-size distinct worker threads, saw {}",
set.len()
);
}
}
81 changes: 58 additions & 23 deletions modules/service/src/elc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ where
&self,
request: Request<MsgCreateClient>,
) -> Result<Response<MsgCreateClientResponse>, Status> {
match self.app.enclave.proto_create_client(request.into_inner()) {
let inner = request.into_inner();
let app = self.app.clone();
let result = tokio::task::spawn_blocking(move || {
app.ecall_pool
.run(move || app.enclave.proto_create_client(inner))
})
.await
.map_err(|e| Status::aborted(format!("create client worker failed: {e}")))?;
match result {
Ok(res) => Ok(Response::new(res)),
Err(e) => Err(Status::aborted(e.to_string())),
}
Expand All @@ -48,8 +56,13 @@ where
let client_id = msg.client_id.clone();
let service = self.clone();
let result = tokio::task::spawn_blocking(move || {
service.with_client_update_serialized(&client_id, || {
service.app.enclave.proto_update_client(msg)
let pool = service.app.ecall_pool.clone();
let enclave = service.app.enclave.clone();
service.with_client_update_serialized(&client_id, move || {
// The blocking-pool thread holds the per-client lock; the
// actual ECALL runs on an EcallPool worker so cumulative
// TCS bindings stay bounded.
pool.run(move || enclave.proto_update_client(msg))
})
})
.await
Expand Down Expand Up @@ -120,8 +133,10 @@ where
let client_id = msg.client_id.clone();
let service = self.clone();
let result = tokio::task::spawn_blocking(move || {
service.with_client_update_serialized(&client_id, || {
service.app.enclave.proto_update_client(msg)
let pool = service.app.ecall_pool.clone();
let enclave = service.app.enclave.clone();
service.with_client_update_serialized(&client_id, move || {
pool.run(move || enclave.proto_update_client(msg))
})
})
.await
Expand Down Expand Up @@ -231,11 +246,15 @@ where
&self,
request: Request<MsgAggregateMessages>,
) -> Result<Response<MsgAggregateMessagesResponse>, Status> {
match self
.app
.enclave
.proto_aggregate_messages(request.into_inner())
{
let inner = request.into_inner();
let app = self.app.clone();
let result = tokio::task::spawn_blocking(move || {
app.ecall_pool
.run(move || app.enclave.proto_aggregate_messages(inner))
})
.await
.map_err(|e| Status::aborted(format!("aggregate messages worker failed: {e}")))?;
match result {
Ok(res) => Ok(Response::new(res)),
Err(e) => Err(Status::aborted(e.to_string())),
}
Expand All @@ -245,11 +264,15 @@ where
&self,
request: Request<MsgVerifyMembership>,
) -> Result<Response<MsgVerifyMembershipResponse>, Status> {
match self
.app
.enclave
.proto_verify_membership(request.into_inner())
{
let inner = request.into_inner();
let app = self.app.clone();
let result = tokio::task::spawn_blocking(move || {
app.ecall_pool
.run(move || app.enclave.proto_verify_membership(inner))
})
.await
.map_err(|e| Status::aborted(format!("verify membership worker failed: {e}")))?;
match result {
Ok(res) => Ok(Response::new(res)),
Err(e) => Err(Status::aborted(e.to_string())),
}
Expand All @@ -259,11 +282,15 @@ where
&self,
request: Request<MsgVerifyNonMembership>,
) -> Result<Response<MsgVerifyNonMembershipResponse>, Status> {
match self
.app
.enclave
.proto_verify_non_membership(request.into_inner())
{
let inner = request.into_inner();
let app = self.app.clone();
let result = tokio::task::spawn_blocking(move || {
app.ecall_pool
.run(move || app.enclave.proto_verify_non_membership(inner))
})
.await
.map_err(|e| Status::aborted(format!("verify non-membership worker failed: {e}")))?;
match result {
Ok(res) => Ok(Response::new(res)),
Err(e) => Err(Status::aborted(e.to_string())),
}
Expand All @@ -273,14 +300,22 @@ where
#[tonic::async_trait]
impl<E, S> Query for AppService<E, S>
where
S: CommitStore + TxAccessor + 'static,
E: EnclaveProtoAPI<S> + 'static,
S: CommitStore + TxAccessor + Send + 'static,
E: EnclaveProtoAPI<S> + Send + Sync + 'static,
{
async fn client(
&self,
request: Request<QueryClientRequest>,
) -> Result<Response<QueryClientResponse>, Status> {
match self.enclave.proto_query_client(request.into_inner()) {
let inner = request.into_inner();
let app = self.clone();
let result = tokio::task::spawn_blocking(move || {
app.ecall_pool
.run(move || app.enclave.proto_query_client(inner))
})
.await
.map_err(|e| Status::aborted(format!("query client worker failed: {e}")))?;
match result {
Ok(res) => Ok(Response::new(res)),
Err(e) => Err(Status::aborted(e.to_string())),
}
Expand Down
1 change: 1 addition & 0 deletions modules/service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod client_lock;
mod ecall_pool;
mod elc;
mod enclave;
mod service;
Expand Down
14 changes: 12 additions & 2 deletions modules/service/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::client_lock::ClientUpdateLocks;
use crate::ecall_pool::EcallPool;
use crate::speculative::SpeculativeService;
use anyhow::Result;
use enclave_api::{EnclaveProtoAPI, SpeculativeEnclaveCommandAPI};
Expand All @@ -19,6 +20,12 @@ where
{
pub(crate) home: PathBuf,
pub(crate) enclave: Arc<E>,
/// Long-lived pool that owns the set of OS threads allowed to ECALL.
/// All ECALL-issuing call sites in the gRPC layer dispatch through this
/// pool to keep the cumulative set of distinct host threads that ever
/// enter the enclave bounded by `--max-enclave-concurrency`. See
/// datachainlab/docs#10123 for the TCSPolicy=BIND rationale.
pub(crate) ecall_pool: Arc<EcallPool>,
_marker: PhantomData<S>,
}

Expand All @@ -41,6 +48,7 @@ where
Self {
home: self.home.clone(),
enclave: self.enclave.clone(),
ecall_pool: self.ecall_pool.clone(),
_marker: Default::default(),
}
}
Expand All @@ -65,10 +73,11 @@ where
S: CommitStore + 'static,
E: EnclaveProtoAPI<S> + 'static,
{
pub fn new<P: Into<PathBuf>>(home: P, enclave: E) -> Self {
pub fn new<P: Into<PathBuf>>(home: P, enclave: E, ecall_concurrency: usize) -> Self {
AppService {
home: home.into(),
enclave: Arc::new(enclave),
ecall_pool: Arc::new(EcallPool::new(ecall_concurrency)),
_marker: Default::default(),
}
}
Expand All @@ -83,8 +92,9 @@ where
home: P,
enclave: E,
speculative_concurrency_limit: usize,
ecall_concurrency: usize,
) -> Self {
let app = AppService::new(home, enclave);
let app = AppService::new(home, enclave, ecall_concurrency);
let speculative = SpeculativeService::new(speculative_concurrency_limit);
Self {
app,
Expand Down
Loading
Loading