diff --git a/include/ctranslate2/replica_pool.h b/include/ctranslate2/replica_pool.h index 8c8e15d8e..f0afa5dd7 100644 --- a/include/ctranslate2/replica_pool.h +++ b/include/ctranslate2/replica_pool.h @@ -309,6 +309,7 @@ namespace ctranslate2 { , _device_index(model->device_index()) , _num_threads(num_threads) , _allocator(nullptr) + , _shutting_down(false) { set_model(model); } @@ -346,13 +347,27 @@ namespace ctranslate2 { _allocator = &get_allocator(_device); } + void prepare_shutdown() override { + // Set shutdown flag BEFORE queue.close() so that idle() won't enter + // synchronize_stream() while the queue is being closed. This prevents + // the deadlock where idle() blocks on CUDA sync while close() has + // already signaled workers to stop. + _shutting_down.store(true, std::memory_order_release); + } + void idle() override { + // Check shutdown flag before synchronizing — synchronize_stream() can + // block indefinitely on CUDA teardown if called during shutdown. + if (_shutting_down.load(std::memory_order_acquire)) + return; + // When no new jobs are immediately available, we synchronize the CUDA stream // so that the CudaAsyncAllocator can release some memory. synchronize_stream(_device); } void finalize() override { + _shutting_down.store(true, std::memory_order_release); _replica.reset(); } @@ -362,6 +377,7 @@ namespace ctranslate2 { const size_t _num_threads; Allocator* _allocator; std::unique_ptr _replica; + std::atomic _shutting_down; }; } diff --git a/include/ctranslate2/thread_pool.h b/include/ctranslate2/thread_pool.h index 826b7e57f..3032cc104 100644 --- a/include/ctranslate2/thread_pool.h +++ b/include/ctranslate2/thread_pool.h @@ -59,7 +59,16 @@ namespace ctranslate2 { virtual ~Worker() = default; void start(JobQueue& job_queue, int thread_affinity = -1); - void join(); + + // Join the worker thread. If timeout_ms > 0 and the thread doesn't finish + // in time, the thread is detached to prevent indefinite blocking. + // This is critical on Windows where CUDA cleanup can hang during shutdown. + void join(int timeout_ms = 5000); + + // Signal the worker to stop idle operations (e.g., synchronize_stream). + // Called BEFORE queue.close() to prevent the race where idle() blocks + // on CUDA synchronization while the queue is being closed. + virtual void prepare_shutdown() {} protected: // Called before the work loop. diff --git a/src/thread_pool.cc b/src/thread_pool.cc index d0aad775b..6e0f754fc 100644 --- a/src/thread_pool.cc +++ b/src/thread_pool.cc @@ -1,5 +1,9 @@ #include "ctranslate2/thread_pool.h" +#include +#include +#include + #include "ctranslate2/utils.h" namespace ctranslate2 { @@ -46,9 +50,19 @@ namespace ctranslate2 { std::unique_ptr JobQueue::get(const std::function& before_wait) { std::unique_lock lock(_mutex); - if (!can_get_job()) { - if (before_wait) + while (!can_get_job()) { + // Only call before_wait() if we're not shutting down. + // before_wait() may call cudaStreamSynchronize() which can block + // indefinitely if there's pending GPU work, preventing clean shutdown. + if (before_wait && !_request_end) { + // Release lock before calling before_wait() to avoid holding mutex + // during potentially blocking CUDA operations. + lock.unlock(); before_wait(); + lock.lock(); + // Re-check condition after re-acquiring lock + continue; + } _can_get_job.wait(lock, [this]{ return can_get_job(); }); } @@ -102,8 +116,41 @@ namespace ctranslate2 { set_thread_affinity(_thread, thread_affinity); } - void Worker::join() { - _thread.join(); + void Worker::join(int timeout_ms) { + if (!_thread.joinable()) { + return; + } + + if (timeout_ms <= 0) { + _thread.join(); + return; + } + + // Timed join using a helper thread. If the worker doesn't finish within + // the timeout (e.g., stuck in CUDA teardown on Windows), detach both + // threads to prevent blocking the process indefinitely. + std::promise join_promise; + std::future join_future = join_promise.get_future(); + + std::thread join_helper([this, &join_promise]() { + if (_thread.joinable()) { + _thread.join(); + } + join_promise.set_value(); + }); + + auto status = join_future.wait_for(std::chrono::milliseconds(timeout_ms)); + + if (status == std::future_status::ready) { + join_helper.join(); + } else { + std::cerr << "[CTranslate2] Warning: Worker thread join timed out after " + << timeout_ms << "ms, detaching to prevent hang\n"; + join_helper.detach(); + if (_thread.joinable()) { + _thread.detach(); + } + } } void Worker::run(JobQueue& job_queue) { @@ -146,6 +193,12 @@ namespace ctranslate2 { } ThreadPool::~ThreadPool() { + // Signal all workers to stop idle synchronization BEFORE closing the queue. + // This prevents the race where a worker enters synchronize_stream() inside + // idle() just before close() sets _request_end. Without this, the worker + // blocks on CUDA sync while the queue is already closed. + for (auto& worker : _workers) + worker->prepare_shutdown(); _queue.close(); for (auto& worker : _workers) worker->join();