Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/ctranslate2/replica_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ namespace ctranslate2 {
, _device_index(model->device_index())
, _num_threads(num_threads)
, _allocator(nullptr)
, _shutting_down(false)
{
set_model(model);
}
Expand Down Expand Up @@ -346,13 +347,27 @@ namespace ctranslate2 {
_allocator = &get_allocator(_device);
}

void prepare_shutdown() override {
// Set shutdown flag BEFORE queue.close() so that idle() won't enter
// synchronize_stream() while the queue is being closed. This prevents
// the deadlock where idle() blocks on CUDA sync while close() has
// already signaled workers to stop.
_shutting_down.store(true, std::memory_order_release);
}

void idle() override {
// Check shutdown flag before synchronizing — synchronize_stream() can
// block indefinitely on CUDA teardown if called during shutdown.
if (_shutting_down.load(std::memory_order_acquire))
return;

// When no new jobs are immediately available, we synchronize the CUDA stream
// so that the CudaAsyncAllocator can release some memory.
synchronize_stream(_device);
}

void finalize() override {
_shutting_down.store(true, std::memory_order_release);
_replica.reset();
}

Expand All @@ -362,6 +377,7 @@ namespace ctranslate2 {
const size_t _num_threads;
Allocator* _allocator;
std::unique_ptr<Replica> _replica;
std::atomic<bool> _shutting_down;
};

}
11 changes: 10 additions & 1 deletion include/ctranslate2/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ namespace ctranslate2 {
virtual ~Worker() = default;

void start(JobQueue& job_queue, int thread_affinity = -1);
void join();

// Join the worker thread. If timeout_ms > 0 and the thread doesn't finish
// in time, the thread is detached to prevent indefinite blocking.
// This is critical on Windows where CUDA cleanup can hang during shutdown.
void join(int timeout_ms = 5000);

// Signal the worker to stop idle operations (e.g., synchronize_stream).
// Called BEFORE queue.close() to prevent the race where idle() blocks
// on CUDA synchronization while the queue is being closed.
virtual void prepare_shutdown() {}

protected:
// Called before the work loop.
Expand Down
61 changes: 57 additions & 4 deletions src/thread_pool.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#include "ctranslate2/thread_pool.h"

#include <chrono>
#include <future>
#include <iostream>

#include "ctranslate2/utils.h"

namespace ctranslate2 {
Expand Down Expand Up @@ -46,9 +50,19 @@ namespace ctranslate2 {
std::unique_ptr<Job> JobQueue::get(const std::function<void()>& before_wait) {
std::unique_lock<std::mutex> lock(_mutex);

if (!can_get_job()) {
if (before_wait)
while (!can_get_job()) {
// Only call before_wait() if we're not shutting down.
// before_wait() may call cudaStreamSynchronize() which can block
// indefinitely if there's pending GPU work, preventing clean shutdown.
if (before_wait && !_request_end) {
// Release lock before calling before_wait() to avoid holding mutex
// during potentially blocking CUDA operations.
lock.unlock();
before_wait();
lock.lock();
// Re-check condition after re-acquiring lock
continue;
}
_can_get_job.wait(lock, [this]{ return can_get_job(); });
}

Expand Down Expand Up @@ -102,8 +116,41 @@ namespace ctranslate2 {
set_thread_affinity(_thread, thread_affinity);
}

void Worker::join() {
_thread.join();
void Worker::join(int timeout_ms) {
if (!_thread.joinable()) {
return;
}

if (timeout_ms <= 0) {
_thread.join();
return;
}

// Timed join using a helper thread. If the worker doesn't finish within
// the timeout (e.g., stuck in CUDA teardown on Windows), detach both
// threads to prevent blocking the process indefinitely.
std::promise<void> join_promise;
std::future<void> join_future = join_promise.get_future();

std::thread join_helper([this, &join_promise]() {
if (_thread.joinable()) {
_thread.join();
}
join_promise.set_value();
});

auto status = join_future.wait_for(std::chrono::milliseconds(timeout_ms));

if (status == std::future_status::ready) {
join_helper.join();
} else {
std::cerr << "[CTranslate2] Warning: Worker thread join timed out after "
<< timeout_ms << "ms, detaching to prevent hang\n";
join_helper.detach();
if (_thread.joinable()) {
_thread.detach();
}
}
}

void Worker::run(JobQueue& job_queue) {
Expand Down Expand Up @@ -146,6 +193,12 @@ namespace ctranslate2 {
}

ThreadPool::~ThreadPool() {
// Signal all workers to stop idle synchronization BEFORE closing the queue.
// This prevents the race where a worker enters synchronize_stream() inside
// idle() just before close() sets _request_end. Without this, the worker
// blocks on CUDA sync while the queue is already closed.
for (auto& worker : _workers)
worker->prepare_shutdown();
_queue.close();
for (auto& worker : _workers)
worker->join();
Expand Down
Loading