-
Notifications
You must be signed in to change notification settings - Fork 0
Feature: Sharefile to Snowflake DAG Class #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6fe9c36
8ce7eb9
40cac6f
d550d0c
36d9651
7c4734f
5d590b9
efb510d
db969c8
b335d66
2fa3399
58a0332
5ec70e4
929d9ad
f7178ac
9ef191f
111dc3b
248f4d6
89a340b
85f91eb
9882e64
9644caf
74e8615
01e8438
9a90112
bd8408e
bd50225
6eff93a
df7cc14
32b37ff
e791ca2
0f4ef7c
6f57bf6
344ada0
950ca35
3dab30f
f76a18a
47b0166
5baa75b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,213 @@ | ||
| import logging | ||
| import os | ||
| from datetime import datetime | ||
| from typing import Callable, List, Optional, Union | ||
|
|
||
| from airflow import DAG | ||
| from airflow.models import Param | ||
| from airflow.operators.python import PythonOperator | ||
|
|
||
| from edu_edfi_airflow.callables import s3 | ||
|
|
||
| from ea_airflow_util.callables.airflow import skip_if_not_in_params_list | ||
| from ea_airflow_util.callables import jsonl, snowflake | ||
| from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator | ||
| from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator | ||
| from ea_airflow_util.callables.airflow import xcom_pull_template | ||
| from ea_airflow_util.providers.sharefile.hooks.sharefile import SharefileHook | ||
|
|
||
| from airflow.providers.amazon.aws.hooks.s3 import S3Hook | ||
| from ea_airflow_util import EACustomDAG | ||
|
|
||
| logging.basicConfig( | ||
| level=logging.INFO, | ||
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | ||
| ) | ||
|
|
||
| class SharefileTransferToSnowflakeDagBuilder: | ||
| """ | ||
| This class is responsible for creating an Apache Airflow DAG that automates the process of transferring | ||
| files from ShareFile to Snowflake. | ||
|
|
||
| Parameters: | ||
| - dag_id (str): ID of the DAG to be created. | ||
| - airflow_default_args (dict): Default arguments to pass to the DAG. | ||
| - file_sources (dict): A mapping of file source names to their ShareFile paths. | ||
| - local_base_path (str): Base local path for downloading files. | ||
| - sharefile_conn_id (str): Airflow connection ID for ShareFile. | ||
| - base_s3_destination_key (str): Base S3 key (path prefix) for uploads. | ||
| - s3_conn_id (str): Airflow connection ID for AWS S3. | ||
| - snowflake_conn_id (str): Airflow connection ID for Snowflake. | ||
| - database (str): Snowflake database name. | ||
| - schema (str): Snowflake schema name. | ||
| - full_refresh (bool): If True, performs a full refresh load in Snowflake. | ||
| - schedule_interval (str or timedelta, optional): Airflow schedule interval for the DAG. | ||
| - transform_csv_to_jsonl (bool, optional): If True, converts downloaded CSV files to JSONL format. | ||
| - delete_remote (bool, optional): If True, deletes the original file from ShareFile after download. | ||
| - delete_local_csv (bool, optional): If True, deletes the CSV after transforming to JSONL. | ||
| - transfer_s3_to_snowflake (BaseOperator, optional): Custom operator for the S3 to Snowflake transfer step. | ||
| - **kwargs: Additional arguments passed to Airflow operators. | ||
| """ | ||
| def __init__(self, | ||
| dag_id: str, | ||
| airflow_default_args: dict, | ||
| file_sources: dict, | ||
| local_base_path: str, | ||
| sharefile_conn_id: str, | ||
| base_s3_destination_key: str, | ||
| s3_conn_id: str, | ||
| snowflake_conn_id: str, | ||
| database: str, | ||
| schema: str, | ||
| full_refresh: bool, | ||
| schedule_interval = None, | ||
| transform_csv_to_jsonl: bool = False, | ||
| delete_remote: bool = False, | ||
| delete_local_csv: bool = False, | ||
| transfer_s3_to_snowflake = None, | ||
| **kwargs | ||
| ): | ||
| self.dag_id = dag_id | ||
| self.airflow_default_args = airflow_default_args | ||
| self.file_sources = file_sources | ||
| self.schedule_interval = schedule_interval | ||
|
|
||
| self.local_base_path = local_base_path | ||
| self.transform_csv_to_jsonl = transform_csv_to_jsonl | ||
|
|
||
| self.sharefile_conn_id = sharefile_conn_id | ||
| self.delete_remote = delete_remote | ||
|
|
||
| self.delete_local_csv = delete_local_csv | ||
|
|
||
| self.base_s3_destination_key = base_s3_destination_key | ||
| self.s3_conn_id = s3_conn_id | ||
|
|
||
| self.snowflake_conn_id = snowflake_conn_id | ||
| self.database = database | ||
| self.schema = schema | ||
| self.full_refresh = full_refresh | ||
| self.transfer_s3_to_snowflake = transfer_s3_to_snowflake | ||
|
|
||
| self.logger = logging.getLogger(self.__class__.__name__) | ||
|
|
||
| self.params_dict = { | ||
| "file_sources": Param( | ||
| default=list(self.file_sources.keys()) if self.file_sources else [], | ||
| examples=list(self.file_sources.keys()), | ||
| type="array", | ||
| description="Newline-separated list of file sources to pull from ShareFile", | ||
| ), | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding of the ShareFile API is that it's possible to fetch files in at least three ways:
I believe we primarily use (1) and (3) for various use-cases. (2) is probably less important. It seems like this code does (3), I can't quite tell if it does (1). What did you test with?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, this particular operator that I'm using (SharefileToDiskOperator) only takes a path. It pulls in all files in the given directory. Should I look for a way to input one file at a time? @tomreitz
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure. A single path is probably fine for now, at least. This could be good to ask other DEs about, to get a sense of how existing ShareFile DAGs work (on the idea that eventually everyone would migrate their DAGs to use this class). |
||
|
|
||
| self.dag = EACustomDAG(dag_id=self.dag_id, | ||
| default_args=self.airflow_default_args, | ||
| schedule_interval=self.schedule_interval, | ||
| params=self.params_dict, | ||
| catchup=False | ||
| ) | ||
|
|
||
| def build_python_operator(self, | ||
| python_callable: Callable, | ||
| **kwargs | ||
| ) -> PythonOperator: | ||
| """ | ||
| Optional Python preprocessing operator to run before Earthmover and Lightbeam. | ||
|
|
||
| :param python_callable: | ||
| :param kwargs: | ||
| :return: | ||
| """ | ||
| callable_name = python_callable.__name__.strip('<>') # Remove brackets around lambdas | ||
| task_id = f"{self.run_type}__preprocess_python_callable__{callable_name}" | ||
|
|
||
| return PythonOperator( | ||
| task_id=task_id, | ||
| python_callable=python_callable, | ||
| op_kwargs=kwargs, | ||
| provide_context=True, | ||
| pool=self.pool, | ||
| dag=self.dag | ||
| ) | ||
|
|
||
| def build_sharefile_to_snowflake_dag(self, **kwargs): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could be helpful if engineers could use parts of this DAG but override other parts when needed. A specific motivating example: I recently had to build a ShareFile-to-Snowflake DAG; I would be excited to use this class instead, but I'm not sure I can... because of the custom Snowfalke You already allow passing in of kwargs, which is good. You could leverage that to allow engineers using this class to pass in overrides for specific tasks (maybe other things too). For example, if you did if not transfer_s3_to_snowflake:
transfer_s3_to_snowflake = S3ToSnowflakeOperator(
...then I could use most of the class and just override the one task: def transfer_s3_to_snowflake():
...
snowflake_hook = SnowflakeHook(snowflake_conn_id=snowflake_conn_id)
snowflake_conn = snowflake_hook.get_connection(snowflake_conn_id)
database = snowflake_conn.extra_dejson["database"]
schema = "..."
endpoint = "..."
...
qry_copy_into = f"""
COPY INTO {database}.{schema}.{endpoint}
(pull_date, pull_timestamp, file_row_number, filename, {header})
FROM (
SELECT
TO_DATE('{mtime_date}') AS pull_date,
TO_TIMESTAMP('{mtime_timestamp}') AS pull_timestamp,
metadata$file_row_number AS file_row_number,
metadata$filename AS filename
...
FROM @{database}.util.airflow_stage/{s3_file}
(file_format => 'csv_iso88591') t -- HERE'S THE CUSTOM FILE FORMAT...
)
force = true;
"""
cursor_log = snowflake_hook.run(sql=[qry_delete, qry_copy_into], autocommit=False)
my_dag = SharefileTransferToSnowflakeDagBuilder(...)
my_dag.build_sharefile_to_snowflake_dag(transfer_s3_to_snowflake)I'd be curious for @jayckaiser's opinion about this - it's similar, but less explicit, to how we allow custom transform scripts to be run on files in the SFTPtoSnowflake DAG class. (Oh, and do we need that functionality here too?) |
||
| """ | ||
| Builds the DAG with tasks for each file source, including: | ||
| - Conditional execution based on DAG parameters. | ||
| - Downloading from ShareFile to local disk (supports both folder and single file paths). | ||
| - Optional transformation from CSV to JSONL. | ||
| - Upload to S3. | ||
| - Load into Snowflake using the default or a custom operator. | ||
|
|
||
| Returns: | ||
| airflow.DAG: The constructed Airflow DAG instance. | ||
| """ | ||
| for file, details in self.file_sources.items(): | ||
|
|
||
| check_if_file_in_param = PythonOperator( | ||
| task_id=f"check_{file}", | ||
| python_callable=skip_if_not_in_params_list, | ||
| op_kwargs={ | ||
| 'param_name': "file_sources", | ||
| 'value': file | ||
| }, | ||
| dag=self.dag, | ||
| **kwargs | ||
| ) | ||
|
|
||
| transfer_sharefile_to_disk = SharefileToDiskOperator( | ||
| task_id=f"transfer_{file}_to_disk", | ||
| sharefile_conn_id=self.sharefile_conn_id, | ||
| sharefile_path=details['sharefile_path'], | ||
| local_path=os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file), | ||
| delete_remote=self.delete_remote, | ||
| dag=self.dag, | ||
| **kwargs | ||
| ) | ||
|
|
||
| transform_to_jsonl = PythonOperator( | ||
| task_id=f"transform_{file}_to_jsonl", | ||
| python_callable=jsonl.translate_csv_file_to_jsonl, | ||
| op_kwargs={ | ||
| 'local_path': xcom_pull_template(transfer_sharefile_to_disk), | ||
| 'output_path': None, | ||
| 'delete_csv': self.delete_local_csv, | ||
| 'metadata_dict': {'file_source': file}, | ||
| }, | ||
| dag=self.dag, | ||
| **kwargs | ||
| ) | ||
|
|
||
| transfer_disk_to_s3 = PythonOperator( | ||
| task_id=f"transfer_{file}_to_s3", | ||
| python_callable=s3.local_filepath_to_s3, | ||
| op_kwargs={ | ||
| 'local_filepath': xcom_pull_template(transfer_sharefile_to_disk), | ||
| 's3_destination_key': f"{self.base_s3_destination_key}/" + '{{ds_nodash}}' + "/" + '{{ts_nodash}}' + f"/{file}", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One thing to consider here is sharding - by tenant, or along other dimensions. If we want to load data from Sharefile paths like
We could do that by looping over the years/tenant codes and creating a separate DAG for each. It could be nice if they were all in one DAG though, with separate tasks (or task groups?) for each tenant. Supporting this would complicate this DAG class, but could be quite useful. One way to do it could be to allow for an optional structure like shard_dimensions = {
"district": ["district1", "district2", ...],
"year": ["2024", "2025"]
}
base_s3_destination_key = "data/{district}/{year}/file.csv"then make a task [group] per combination of dimension values, support formatted paths, and pass the shards down to other tasks: from itertools import product
...
shard_groups = []
if shard_dimensions:
for item in product(*shard_dimensions.values()):
shard_groups.append(dict(zip(shard_dimensions.keys(), item)))
else: shard_groups.append({})
for shard_group in shard_groups:
path = base_s3_destination_key.format(**shard_group)
...
# and possibly pass the `shard_group` data into S3ToSnowflakeOperator()
# via `column_names_str` and `metadata_columns`(This isn't a requirement to add to the class, just something to think about. It could be added later.) |
||
| 's3_conn_id': self.s3_conn_id | ||
| }, | ||
| dag=self.dag, | ||
| **kwargs | ||
| ) | ||
|
|
||
| if not self.transfer_s3_to_snowflake: | ||
| transfer_s3_to_snowflake = S3ToSnowflakeOperator( | ||
| task_id=f"{file}_s3_to_snowflake", | ||
| snowflake_conn_id=self.snowflake_conn_id, | ||
| database=self.database, | ||
| schema=self.schema, | ||
| table_name=file, | ||
| s3_destination_key=self.base_s3_destination_key, | ||
| full_refresh=self.full_refresh, | ||
| dag=self.dag, | ||
| **kwargs | ||
| ) | ||
| else: | ||
| transfer_s3_to_snowflake = self.build_python_operator(self.transfer_s3_to_snowflake) | ||
|
|
||
| if self.transform_csv_to_jsonl: | ||
| check_if_file_in_param >> transfer_sharefile_to_disk >> transform_to_jsonl >> transfer_disk_to_s3 >> transfer_s3_to_snowflake | ||
| else: | ||
| check_if_file_in_param >> transfer_sharefile_to_disk >> transfer_disk_to_s3 >> transfer_s3_to_snowflake | ||
|
|
||
| return self.dag | ||
Uh oh!
There was an error while loading. Please reload this page.