From 9bd915fc453fedb14def265a0d3d4c0c3aa8ca34 Mon Sep 17 00:00:00 2001 From: Luis Gonzalez Date: Thu, 29 Jan 2026 21:20:02 -0500 Subject: [PATCH] added concurrency to celery in docker_compose.yml to run multiple tasks in parallel, before it was set up as pool=solo this will only allow Celery to run multiple tasks in parallel --- c_logging_config.py | 0 docker-compose.yml | 4 ++-- fluent-bit/fluent-bit.conf | 0 worker.py | 22 ++++++++++------------ 4 files changed, 12 insertions(+), 14 deletions(-) create mode 100644 c_logging_config.py create mode 100644 fluent-bit/fluent-bit.conf diff --git a/c_logging_config.py b/c_logging_config.py new file mode 100644 index 0000000..e69de29 diff --git a/docker-compose.yml b/docker-compose.yml index 4a873a5..75a26cc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,9 +36,9 @@ services: build: . volumes: - ./data:/app/data - command: celery -A worker worker --loglevel=info --pool=solo --concurrency=4 + command: celery -A worker worker --loglevel=info --concurrency=8 env_file: ./config/.env - deploy: + deploy: resources: limits: memory: 512M diff --git a/fluent-bit/fluent-bit.conf b/fluent-bit/fluent-bit.conf new file mode 100644 index 0000000..e69de29 diff --git a/worker.py b/worker.py index 4b53b9c..a99a1be 100644 --- a/worker.py +++ b/worker.py @@ -129,7 +129,7 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git remaining_api_calls = github_instance.rate_limiting remaining = remaining_api_calls[0] - if int(counter) >= int(500): + if remaining_api_calls == 1: print(f"Reached batch size limit of {batch_size}") break @@ -178,21 +178,19 @@ def aggregate_results(results): def build_repo_chord(total: int = 5000, batch_size: int = 500): header = [ - get_github_data.s(start, batch_size) - for start in range(0, total, batch_size) + get_github_data.s(start, batch_size) for start in range(0, total, batch_size) ] return chord(header)(aggregate_results.s()) - # old code that did not work -@app.task -def distribute_tasks(): +# @app.task +# def distribute_tasks(): - jobs = group([ - get_github_data.s(start, 500) - for start in range(0, 5000, 500) - ]) +# jobs = group([ +# get_github_data.s(start, 500) +# for start in range(0, 5000, 500) +# ]) - return chord(jobs)() - \ No newline at end of file +# return chord(jobs)() + \ No newline at end of file