-
Notifications
You must be signed in to change notification settings - Fork 0
fixed polar data type issue, where it would not save the data properly #27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| from airflow.sdk import dag, task | ||
| from pendulum import datetime | ||
|
|
||
| @dag( | ||
| schedule="@hourly", | ||
| start_date=datetime(2026, 2, 2), | ||
| description="Run Celery queue with RabbitMQ as the broker \ | ||
| in order to get GitHub data from the GitHub API", | ||
| tags=["celery_queue"], | ||
| max_consecutive_failed_dag_runs=3, | ||
| ) | ||
| def run_queue(): | ||
|
|
||
| @task | ||
| def run_the_queue(): | ||
| print("hello") | ||
|
|
||
| run_the_queue() | ||
|
|
||
|
|
||
| run_queue() | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -7,7 +7,7 @@ | |||||
| import polars as pl | ||||||
|
|
||||||
|
|
||||||
| today = datetime.now(timezone.utc).strftime("%Y-%m-%d") | ||||||
| today = datetime.now().strftime("%Y-%m-%d") | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change removes timezone awareness from the
Suggested change
|
||||||
|
|
||||||
| print("Waiting for Celery task to complete") | ||||||
|
|
||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,10 +1,14 @@ | ||||||||||||||||||||
| import duckdb | ||||||||||||||||||||
|
|
||||||||||||||||||||
| df = duckdb.read_parquet("../2026-01-14/hey.parquet") | ||||||||||||||||||||
| duckdb.sql("SET display_max_rows=10000") | ||||||||||||||||||||
|
|
||||||||||||||||||||
| duckdb.sql("DESCRIBE SELECT * FROM df").show() | ||||||||||||||||||||
| df = duckdb.read_parquet("../2026-02-02/github_data.parquet") | ||||||||||||||||||||
|
|
||||||||||||||||||||
| duckdb.sql("SELECT language, COUNT(language) AS c_p \ | ||||||||||||||||||||
| FROM df \ | ||||||||||||||||||||
| GROUP BY language \ | ||||||||||||||||||||
| ORDER BY c_p DESC").show() | ||||||||||||||||||||
| # duckdb.sql("DESCRIBE SELECT * FROM df").show() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| # duckdb.sql("SELECT language, COUNT(language) AS c_p \ | ||||||||||||||||||||
| # FROM df \ | ||||||||||||||||||||
| # GROUP BY language \ | ||||||||||||||||||||
| # ORDER BY c_p DESC").show() | ||||||||||||||||||||
|
|
||||||||||||||||||||
| duckdb.sql("DESCRIBE SELECT * FROM df").show() | ||||||||||||||||||||
|
Comment on lines
+7
to
+14
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This section contains commented-out code and a duplicated line (
Suggested change
|
||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -84,6 +84,50 @@ services: | |
| celery_worker: | ||
| condition: service_healthy | ||
|
|
||
|
|
||
| # airflow-init: | ||
| # image: apache/airflow:2.9.2 | ||
| # environment: | ||
| # AIRFLOW__CORE__EXECUTOR: LocalExecutor | ||
| # AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db | ||
| # AIRFLOW__CORE__LOAD_EXAMPLES: "false" | ||
| # volumes: | ||
| # - ./airflow/dags:/opt/airflow/dags | ||
| # - ./:/opt/airflow/project | ||
| # - airflow_data:/opt/airflow | ||
| # command: ["airflow", "db", "init"] | ||
|
|
||
| # airflow-scheduler: | ||
| # image: apache/airflow:2.9.2 | ||
| # depends_on: | ||
| # - airflow-init | ||
| # environment: | ||
| # AIRFLOW__CORE__EXECUTOR: LocalExecutor | ||
| # AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db | ||
| # AIRFLOW__CORE__LOAD_EXAMPLES: "false" | ||
| # volumes: | ||
| # - ./airflow/dags:/opt/airflow/dags | ||
| # - ./:/opt/airflow/project | ||
| # - airflow_data:/opt/airflow | ||
| # command: ["airflow", "scheduler"] | ||
|
|
||
| # airflow-webserver: | ||
| # image: apache/airflow:2.9.2 | ||
| # depends_on: | ||
| # - airflow-init | ||
| # environment: | ||
| # AIRFLOW__CORE__EXECUTOR: LocalExecutor | ||
| # AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: sqlite:////opt/airflow/airflow.db | ||
| # AIRFLOW__CORE__LOAD_EXAMPLES: "false" | ||
| # volumes: | ||
| # - ./airflow/dags:/opt/airflow/dags | ||
| # - ./:/opt/airflow/project | ||
| # - airflow_data:/opt/airflow | ||
| # ports: | ||
| # - "8080:8080" | ||
| # command: ["airflow", "webserver"] | ||
|
Comment on lines
+88
to
+128
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A large block of commented-out configuration for Airflow has been added. If this is not actively used, it should be removed to keep the |
||
|
|
||
|
|
||
| volumes: | ||
| rabbitmq_data: | ||
| redis_data: | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -6,7 +6,7 @@ | |||||
| class RabbitMQ_Data_Validation(BaseModel): | ||||||
| # ===== MESSAGE METADATA ===== | ||||||
| message_id: str | ||||||
| got_data_in: str = Field(default_factory=lambda: datetime.now().isoformat()) | ||||||
| got_data_in: datetime = Field(default_factory=lambda: datetime.now()) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using
Suggested change
|
||||||
|
|
||||||
| # ===== BASIC INFO ===== | ||||||
| repo_id: int | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,7 +15,7 @@ | |
|
|
||
|
|
||
| logger = get_task_logger(__name__) | ||
| todays_date = datetime.now().isoformat() | ||
| todays_date = datetime.now() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Defining |
||
| S3_BUCKET_NAME = "github-etl-data-bucket" | ||
|
|
||
|
|
||
|
|
@@ -70,7 +70,7 @@ def get_github_data(self, start_in_repo_num: int = 0, batch_size: int = 500, git | |
| try: | ||
| github_data_points = { | ||
| "message_id": self.request.id, | ||
| "got_data_in": todays_date if todays_date else datetime.now().isoformat(), | ||
| "got_data_in": todays_date if todays_date else datetime.now(), | ||
| "repo_id": repo.id if repo.id is not None else 0, | ||
| "name": repo.name if repo.name else "unknown", | ||
| "full_name": repo.full_name if repo.full_name else "unknown/unknown", | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The task
run_the_queueis currently a placeholder that only prints "hello". This is misleading given the DAG's name (run_queue) and its description, which states it's for running a Celery queue. The task should contain the logic to actually trigger or run the Celery/RabbitMQ process. If this is a work in progress, consider adding aTODOcomment to make the placeholder nature explicit.