Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions airflow/dags/my_dag.py

This file was deleted.

21 changes: 21 additions & 0 deletions airflow/dags/run_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from airflow.sdk import dag, task
from pendulum import datetime

@dag(
schedule="@hourly",
start_date=datetime(2026, 2, 2),
description="Run Celery queue with RabbitMQ as the broker \
in order to get GitHub data from the GitHub API",
tags=["celery_queue"],
max_consecutive_failed_dag_runs=3,
)
def run_queue():

@task
def run_the_queue():
print("hello")

run_the_queue()
Comment on lines +14 to +18
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The task run_the_queue is currently a placeholder that only prints "hello". This is misleading given the DAG's name (run_queue) and its description, which states it's for running a Celery queue. The task should contain the logic to actually trigger or run the Celery/RabbitMQ process. If this is a work in progress, consider adding a TODO comment to make the placeholder nature explicit.

Suggested change
@task
def run_the_queue():
print("hello")
run_the_queue()
@task
def run_the_queue():
# TODO: Implement logic to run the Celery queue for GitHub data processing.
print("hello")
run_the_queue()



run_queue()
2 changes: 1 addition & 1 deletion client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import polars as pl


today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
today = datetime.now().strftime("%Y-%m-%d")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This change removes timezone awareness from the today variable by switching from datetime.now(timezone.utc) to datetime.now(). Using naive datetimes can lead to ambiguity and bugs, especially in distributed systems. It's recommended to consistently use timezone-aware datetimes to ensure consistency.

Suggested change
today = datetime.now().strftime("%Y-%m-%d")
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")


print("Waiting for Celery task to complete")

Expand Down
16 changes: 10 additions & 6 deletions data/analytics/ducky.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import duckdb

df = duckdb.read_parquet("../2026-01-14/hey.parquet")
duckdb.sql("SET display_max_rows=10000")

duckdb.sql("DESCRIBE SELECT * FROM df").show()
df = duckdb.read_parquet("../2026-02-02/github_data.parquet")

duckdb.sql("SELECT language, COUNT(language) AS c_p \
FROM df \
GROUP BY language \
ORDER BY c_p DESC").show()
# duckdb.sql("DESCRIBE SELECT * FROM df").show()

# duckdb.sql("SELECT language, COUNT(language) AS c_p \
# FROM df \
# GROUP BY language \
# ORDER BY c_p DESC").show()

duckdb.sql("DESCRIBE SELECT * FROM df").show()
Comment on lines +7 to +14
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This section contains commented-out code and a duplicated line (duckdb.sql("DESCRIBE SELECT * FROM df").show()). To improve code clarity and maintainability, it's best to remove unused, commented-out code.

Suggested change
# duckdb.sql("DESCRIBE SELECT * FROM df").show()
# duckdb.sql("SELECT language, COUNT(language) AS c_p \
# FROM df \
# GROUP BY language \
# ORDER BY c_p DESC").show()
duckdb.sql("DESCRIBE SELECT * FROM df").show()
duckdb.sql("DESCRIBE SELECT * FROM df").show()

44 changes: 44 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,50 @@ services:
celery_worker:
condition: service_healthy


# airflow-init:
# image: apache/airflow:2.9.2
# environment:
# AIRFLOW__CORE__EXECUTOR: LocalExecutor
# AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
# AIRFLOW__CORE__LOAD_EXAMPLES: "false"
# volumes:
# - ./airflow/dags:/opt/airflow/dags
# - ./:/opt/airflow/project
# - airflow_data:/opt/airflow
# command: ["airflow", "db", "init"]

# airflow-scheduler:
# image: apache/airflow:2.9.2
# depends_on:
# - airflow-init
# environment:
# AIRFLOW__CORE__EXECUTOR: LocalExecutor
# AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
# AIRFLOW__CORE__LOAD_EXAMPLES: "false"
# volumes:
# - ./airflow/dags:/opt/airflow/dags
# - ./:/opt/airflow/project
# - airflow_data:/opt/airflow
# command: ["airflow", "scheduler"]

# airflow-webserver:
# image: apache/airflow:2.9.2
# depends_on:
# - airflow-init
# environment:
# AIRFLOW__CORE__EXECUTOR: LocalExecutor
# AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db
# AIRFLOW__CORE__LOAD_EXAMPLES: "false"
# volumes:
# - ./airflow/dags:/opt/airflow/dags
# - ./:/opt/airflow/project
# - airflow_data:/opt/airflow
# ports:
# - "8080:8080"
# command: ["airflow", "webserver"]
Comment on lines +88 to +128
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

A large block of commented-out configuration for Airflow has been added. If this is not actively used, it should be removed to keep the docker-compose.yml file clean and focused on the running services. If this is for documentation or an alternative setup, consider moving it to a separate file (e.g., docker-compose.airflow.yml) or into the project's documentation.



volumes:
rabbitmq_data:
redis_data:
2 changes: 1 addition & 1 deletion pydantic_models/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
class RabbitMQ_Data_Validation(BaseModel):
# ===== MESSAGE METADATA =====
message_id: str
got_data_in: str = Field(default_factory=lambda: datetime.now().isoformat())
got_data_in: datetime = Field(default_factory=lambda: datetime.now())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using datetime.now() creates a naive datetime object, which can lead to issues with timezones. It is a best practice to use timezone-aware datetimes to avoid ambiguity. You can use datetime.now(timezone.utc) for this. Note that you will need to add from datetime import timezone at the top of the file.

Suggested change
got_data_in: datetime = Field(default_factory=lambda: datetime.now())
got_data_in: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))


# ===== BASIC INFO =====
repo_id: int
Expand Down
4 changes: 2 additions & 2 deletions worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


logger = get_task_logger(__name__)
todays_date = datetime.now().isoformat()
todays_date = datetime.now()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Defining todays_date at the module level is a critical issue. It gets initialized only once when the worker module is imported, causing all tasks to use the same, stale timestamp. This affects not only got_data_in but also the fallbacks for created_at, updated_at, and pushed_at. To ensure data accuracy, the timestamp should be generated inside the get_github_data task each time it runs. It is also recommended to use timezone-aware datetimes, e.g., datetime.now(timezone.utc) (which would require importing timezone from datetime).

S3_BUCKET_NAME = "github-etl-data-bucket"


Expand Down Expand Up @@ -70,7 +70,7 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git
try:
github_data_points = {
"message_id": self.request.id,
"got_data_in": todays_date if todays_date else datetime.now().isoformat(),
"got_data_in": todays_date if todays_date else datetime.now(),
"repo_id": repo.id if repo.id is not None else 0,
"name": repo.name if repo.name else "unknown",
"full_name": repo.full_name if repo.full_name else "unknown/unknown",
Expand Down
Loading