From 9adcb864425e7bde527f74ef4becc97de9bbcf4c Mon Sep 17 00:00:00 2001 From: Luis Gonzalez Date: Mon, 2 Feb 2026 14:07:22 -0500 Subject: [PATCH] fixed polar data type issue, where it would not save the data properly because a datetime variable would be str instead of datetime --- airflow/dags/my_dag.py | 17 --------------- airflow/dags/run_queue.py | 21 +++++++++++++++++++ client.py | 2 +- data/analytics/ducky.py | 16 ++++++++------ docker-compose.yml | 44 +++++++++++++++++++++++++++++++++++++++ pydantic_models/github.py | 2 +- worker.py | 4 ++-- 7 files changed, 79 insertions(+), 27 deletions(-) delete mode 100644 airflow/dags/my_dag.py create mode 100644 airflow/dags/run_queue.py diff --git a/airflow/dags/my_dag.py b/airflow/dags/my_dag.py deleted file mode 100644 index 295f07b..0000000 --- a/airflow/dags/my_dag.py +++ /dev/null @@ -1,17 +0,0 @@ -from airflow.sdk import dag, task -from pendulum import datetime - -@dag( - schedule="@daily", - start_date=datetime(2026, 1, 31), - description="test dag", - tags=["first dag"], - max_consecutive_failed_dag_runs=3, -) -def my_dag(): - - @task - def _task_a(): - print("hello") - - _task_a() \ No newline at end of file diff --git a/airflow/dags/run_queue.py b/airflow/dags/run_queue.py new file mode 100644 index 0000000..a8ab096 --- /dev/null +++ b/airflow/dags/run_queue.py @@ -0,0 +1,21 @@ +from airflow.sdk import dag, task +from pendulum import datetime + +@dag( + schedule="@hourly", + start_date=datetime(2026, 2, 2), + description="Run Celery queue with RabbitMQ as the broker \ + in order to get GitHub data from the GitHub API", + tags=["celery_queue"], + max_consecutive_failed_dag_runs=3, +) +def run_queue(): + + @task + def run_the_queue(): + print("hello") + + run_the_queue() + + +run_queue() \ No newline at end of file diff --git a/client.py b/client.py index 5bd8507..6b792b6 100644 --- a/client.py +++ b/client.py @@ -7,7 +7,7 @@ import polars as pl -today = datetime.now(timezone.utc).strftime("%Y-%m-%d") +today = datetime.now().strftime("%Y-%m-%d") print("Waiting for Celery task to complete") diff --git a/data/analytics/ducky.py b/data/analytics/ducky.py index a9c7701..384bcc6 100644 --- a/data/analytics/ducky.py +++ b/data/analytics/ducky.py @@ -1,10 +1,14 @@ import duckdb -df = duckdb.read_parquet("../2026-01-14/hey.parquet") +duckdb.sql("SET display_max_rows=10000") -duckdb.sql("DESCRIBE SELECT * FROM df").show() +df = duckdb.read_parquet("../2026-02-02/github_data.parquet") -duckdb.sql("SELECT language, COUNT(language) AS c_p \ - FROM df \ - GROUP BY language \ - ORDER BY c_p DESC").show() \ No newline at end of file +# duckdb.sql("DESCRIBE SELECT * FROM df").show() + +# duckdb.sql("SELECT language, COUNT(language) AS c_p \ +# FROM df \ +# GROUP BY language \ +# ORDER BY c_p DESC").show() + +duckdb.sql("DESCRIBE SELECT * FROM df").show() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 75a26cc..1b2cf20 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -84,6 +84,50 @@ services: celery_worker: condition: service_healthy + + # airflow-init: + # image: apache/airflow:2.9.2 + # environment: + # AIRFLOW__CORE__EXECUTOR: LocalExecutor + # AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db + # AIRFLOW__CORE__LOAD_EXAMPLES: "false" + # volumes: + # - ./airflow/dags:/opt/airflow/dags + # - ./:/opt/airflow/project + # - airflow_data:/opt/airflow + # command: ["airflow", "db", "init"] + + # airflow-scheduler: + # image: apache/airflow:2.9.2 + # depends_on: + # - airflow-init + # environment: + # AIRFLOW__CORE__EXECUTOR: LocalExecutor + # AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db + # AIRFLOW__CORE__LOAD_EXAMPLES: "false" + # volumes: + # - ./airflow/dags:/opt/airflow/dags + # - ./:/opt/airflow/project + # - airflow_data:/opt/airflow + # command: ["airflow", "scheduler"] + + # airflow-webserver: + # image: apache/airflow:2.9.2 + # depends_on: + # - airflow-init + # environment: + # AIRFLOW__CORE__EXECUTOR: LocalExecutor + # AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db + # AIRFLOW__CORE__LOAD_EXAMPLES: "false" + # volumes: + # - ./airflow/dags:/opt/airflow/dags + # - ./:/opt/airflow/project + # - airflow_data:/opt/airflow + # ports: + # - "8080:8080" + # command: ["airflow", "webserver"] + + volumes: rabbitmq_data: redis_data: \ No newline at end of file diff --git a/pydantic_models/github.py b/pydantic_models/github.py index 2460eb4..c616d5c 100644 --- a/pydantic_models/github.py +++ b/pydantic_models/github.py @@ -6,7 +6,7 @@ class RabbitMQ_Data_Validation(BaseModel): # ===== MESSAGE METADATA ===== message_id: str - got_data_in: str = Field(default_factory=lambda: datetime.now().isoformat()) + got_data_in: datetime = Field(default_factory=lambda: datetime.now()) # ===== BASIC INFO ===== repo_id: int diff --git a/worker.py b/worker.py index 55506ea..8e37ee3 100644 --- a/worker.py +++ b/worker.py @@ -15,7 +15,7 @@ logger = get_task_logger(__name__) -todays_date = datetime.now().isoformat() +todays_date = datetime.now() S3_BUCKET_NAME = "github-etl-data-bucket" @@ -70,7 +70,7 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git try: github_data_points = { "message_id": self.request.id, - "got_data_in": todays_date if todays_date else datetime.now().isoformat(), + "got_data_in": todays_date if todays_date else datetime.now(), "repo_id": repo.id if repo.id is not None else 0, "name": repo.name if repo.name else "unknown", "full_name": repo.full_name if repo.full_name else "unknown/unknown",