From 6fe9c36c13c73570c07a6b983133996fe57e0479 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Thu, 6 Mar 2025 08:29:30 -0700 Subject: [PATCH 01/38] test first class method --- .../sharefile_to_snowflake_dag_builder.py | 153 ++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py new file mode 100644 index 0000000..292b822 --- /dev/null +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -0,0 +1,153 @@ +import os + +from airflow import DAG +from airflow.models import Param +from airflow.operators.python import PythonOperator + +from util import io_helpers + +from ea_airflow_util.callables.airflow import skip_if_not_in_params_list, xcom_pull_template +from ea_airflow_util.callables import jsonl, snowflake +from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator +from edu_edfi_airflow.callables import s3 +from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator + +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + +class SharefileTransferToSnowflakeDagBuilder: + """ + :param configs_dir: + :param airflow_config: + :param sharefile_sources: + :param sharefile_conn_id: + :param s3_destination_key: + :param s3_conn_id: + :param snowflake_conn_id: + :: + """ + def __init__(self, + configs_dir: str, + airflow_config: str, + sharefile_sources: str, + sharefile_conn_id: str, + s3_destination_key: str, + s3_conn_id: str, + snowflake_conn_id: str + ): + self.configs_dir = configs_dir + self.airflow_config = airflow_config + self.dag_configs = self.airflow_configs['sharefile_transfer_pred'] + self.file_sources_dict = io_helpers.safe_load_yaml(configs_dir, sharefile_sources) + self.sharefile_conn_id = sharefile_conn_id + self.s3_destination_key = s3_destination_key + self.s3_conn_id = s3_conn_id + self.snowflake_conn_id = snowflake_conn_id + + self.dag = None + self._initialize_dag() + + def _initialize_dag(self): + params = { + "file_sources": Param( + default=list(self.file_sources_dict.get(self.run_type, {})), + examples=list(self.file_sources_dict.get(self.run_type, {})), + type="array", + description="Newline-separated list of file sources to pull from ShareFile (default all in `sharefile_sources.yml`)", + ), + } + + self.dag = DAG( + dag_id=f"sharefile_{self.run_type}_transfer", + default_args=self.dag_configs['default_args'], + schedule_interval=self.dag_configs['schedule_interval'], + params=params, + user_defined_macros={'slack_conn_id': None}, + catchup=False, + ) + + def check_if_file_in_params(self, file): + """Checks if the file exists in the provided parameters.""" + return PythonOperator( + task_id=f"check_{file}", + python_callable=skip_if_not_in_params_list, + op_kwargs={ + 'param_name': "file_sources", + 'value': file + }, + dag=self.dag + ) + + # def transfer_sharefile_to_disk(self, file, file_configs): + # """Transfers file from ShareFile to local disk.""" + # return SharefileToDiskOperator( + # task_id=f"transfer_{file}_to_disk", + # sharefile_conn_id=self.dag_configs['sharefile_conn_id'], + # sharefile_path=file_configs['sharefile_path'], + # local_path=self._generate_file_path(file, 'disk'), + # delete_remote=self.dag_configs['delete_remote'], + # dag=self.dag + # ) + + # def transform_to_jsonl(self, transfer_task, file): + # """Transforms CSV to JSONL.""" + # return PythonOperator( + # task_id=f"transform_{file}_to_jsonl", + # python_callable=jsonl.translate_csv_file_to_jsonl, + # op_kwargs={ + # 'local_path': xcom_pull_template(transfer_task), + # 'output_path': None, + # 'delete_csv': True, + # 'metadata_dict': {'file_source': file}, + # }, + # dag=self.dag + # ) + + # def _transfer_disk_to_s3(self, file): + # """Transfers file from local disk to S3.""" + # return PythonOperator( + # task_id=f"transfer_{file}_to_s3", + # python_callable=s3.local_filepath_to_s3, + # op_kwargs={ + # 'local_filepath': self._generate_file_path(file, 'disk'), + # 's3_destination_key': f"ea_research/{file}", + # 's3_conn_id': self.dag_configs['s3_conn_id'] + # }, + # dag=self.dag + # ) + + # def transfer_s3_to_snowflake(self, file): + # """Transfers file from S3 to Snowflake.""" + # return S3ToSnowflakeOperator( + # task_id=f"{file}_s3_to_snowflake", + # snowflake_conn_id=self.dag_configs['snowflake_conn_id'], + # database=self.dag_configs['snowflake_db'], + # schema=self.dag_configs['snowflake_schema'], + # table_name=file, + # s3_destination_key=f"ea_research/{file}", + # full_refresh=True, + # dag=self.dag + # ) + + # def build_dag(self): + # """Builds the complete DAG, setting up tasks and their dependencies.""" + # for file, file_configs in self.file_sources_dict.get(self.run_type, {}).items(): + # # Task for checking if file is in parameters list + # check_task = self._check_if_file_in_params(file) + + # # Task for transferring from ShareFile to Disk + # transfer_task = self._transfer_sharefile_to_disk(file, file_configs) + + # # Task for transforming from CSV to JSONL + # transform_task = self._transform_to_jsonl(transfer_task, file) + + # # Task for transferring from Disk to S3 + # transfer_to_s3_task = self._transfer_disk_to_s3(file) + + # # Task for transferring from S3 to Snowflake + # s3_to_snowflake_task = self._transfer_s3_to_snowflake(file) + + # # Set task dependencies in a linear flow + # check_task >> transfer_task >> transform_task >> transfer_to_s3_task >> s3_to_snowflake_task + + # return self.dag + From 8ce7eb99434b51dfb53b6f46f5cc922c34330116 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Fri, 7 Mar 2025 11:32:19 -0700 Subject: [PATCH 02/38] updated variable inputs to init and individual methods --- .../sharefile_to_snowflake_dag_builder.py | 151 ++++++++---------- 1 file changed, 69 insertions(+), 82 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 292b822..1f29007 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -16,32 +16,18 @@ class SharefileTransferToSnowflakeDagBuilder: """ - :param configs_dir: - :param airflow_config: - :param sharefile_sources: - :param sharefile_conn_id: - :param s3_destination_key: - :param s3_conn_id: - :param snowflake_conn_id: - :: + :: """ - def __init__(self, - configs_dir: str, - airflow_config: str, - sharefile_sources: str, - sharefile_conn_id: str, - s3_destination_key: str, - s3_conn_id: str, - snowflake_conn_id: str + def __init__(self, + dag_id, + airflow_default_args, + file_sources, + schedule_interval = None ): - self.configs_dir = configs_dir - self.airflow_config = airflow_config - self.dag_configs = self.airflow_configs['sharefile_transfer_pred'] - self.file_sources_dict = io_helpers.safe_load_yaml(configs_dir, sharefile_sources) - self.sharefile_conn_id = sharefile_conn_id - self.s3_destination_key = s3_destination_key - self.s3_conn_id = s3_conn_id - self.snowflake_conn_id = snowflake_conn_id + self.dag_id = dag_id + self.airflow_default_args = airflow_default_args + self.file_sources = file_sources + self.schedule_interval = schedule_interval self.dag = None self._initialize_dag() @@ -49,19 +35,18 @@ def __init__(self, def _initialize_dag(self): params = { "file_sources": Param( - default=list(self.file_sources_dict.get(self.run_type, {})), - examples=list(self.file_sources_dict.get(self.run_type, {})), - type="array", - description="Newline-separated list of file sources to pull from ShareFile (default all in `sharefile_sources.yml`)", + default=list(self.file_sources, {}), + examples=list(self.file_sources, {}), + type="list", + description="Newline-separated list of file sources to pull from ShareFile", ), } self.dag = DAG( - dag_id=f"sharefile_{self.run_type}_transfer", - default_args=self.dag_configs['default_args'], - schedule_interval=self.dag_configs['schedule_interval'], + dag_id=self.dag_id, + default_args=self.airflow_default_args, + schedule_interval=self.schedule_interval, params=params, - user_defined_macros={'slack_conn_id': None}, catchup=False, ) @@ -77,56 +62,58 @@ def check_if_file_in_params(self, file): dag=self.dag ) - # def transfer_sharefile_to_disk(self, file, file_configs): - # """Transfers file from ShareFile to local disk.""" - # return SharefileToDiskOperator( - # task_id=f"transfer_{file}_to_disk", - # sharefile_conn_id=self.dag_configs['sharefile_conn_id'], - # sharefile_path=file_configs['sharefile_path'], - # local_path=self._generate_file_path(file, 'disk'), - # delete_remote=self.dag_configs['delete_remote'], - # dag=self.dag - # ) - - # def transform_to_jsonl(self, transfer_task, file): - # """Transforms CSV to JSONL.""" - # return PythonOperator( - # task_id=f"transform_{file}_to_jsonl", - # python_callable=jsonl.translate_csv_file_to_jsonl, - # op_kwargs={ - # 'local_path': xcom_pull_template(transfer_task), - # 'output_path': None, - # 'delete_csv': True, - # 'metadata_dict': {'file_source': file}, - # }, - # dag=self.dag - # ) - - # def _transfer_disk_to_s3(self, file): - # """Transfers file from local disk to S3.""" - # return PythonOperator( - # task_id=f"transfer_{file}_to_s3", - # python_callable=s3.local_filepath_to_s3, - # op_kwargs={ - # 'local_filepath': self._generate_file_path(file, 'disk'), - # 's3_destination_key': f"ea_research/{file}", - # 's3_conn_id': self.dag_configs['s3_conn_id'] - # }, - # dag=self.dag - # ) - - # def transfer_s3_to_snowflake(self, file): - # """Transfers file from S3 to Snowflake.""" - # return S3ToSnowflakeOperator( - # task_id=f"{file}_s3_to_snowflake", - # snowflake_conn_id=self.dag_configs['snowflake_conn_id'], - # database=self.dag_configs['snowflake_db'], - # schema=self.dag_configs['snowflake_schema'], - # table_name=file, - # s3_destination_key=f"ea_research/{file}", - # full_refresh=True, - # dag=self.dag - # ) + def transfer_sharefile_to_disk(self, file, sharefile_conn_id, + sharefile_path, local_path, delete_remote): + """Transfers file from ShareFile to local disk.""" + return SharefileToDiskOperator( + task_id=f"transfer_{file}_to_disk", + sharefile_conn_id=sharefile_conn_id, + sharefile_path=sharefile_path, + local_path=local_path, + delete_remote=delete_remote, + dag=self.dag + ) + + def transform_to_jsonl(self, file, local_path, delete_csv): + """Transforms CSV to JSONL.""" + return PythonOperator( + task_id=f"transform_{file}_to_jsonl", + python_callable=jsonl.translate_csv_file_to_jsonl, + op_kwargs={ + 'local_path': local_path, + 'output_path': None, + 'delete_csv': delete_csv, + 'metadata_dict': {'file_source': file}, + }, + dag=self.dag + ) + + def transfer_disk_to_s3(self, file): + """Transfers file from local disk to S3.""" + return PythonOperator( + task_id=f"transfer_{file}_to_s3", + python_callable=s3.local_filepath_to_s3, + op_kwargs={ + 'local_filepath': self._generate_file_path(file, 'disk'), + 's3_destination_key': f"ea_research/{file}", + 's3_conn_id': self.dag_configs['s3_conn_id'] + }, + dag=self.dag + ) + + def transfer_s3_to_snowflake(self, file, snowflake_conn_id, database, schema, + full_refresh): + """Transfers file from S3 to Snowflake.""" + return S3ToSnowflakeOperator( + task_id=f"{file}_s3_to_snowflake", + snowflake_conn_id=snowflake_conn_id, + database=database, + schema=schema, + table_name=file, + s3_destination_key=f"ea_research/{file}", + full_refresh=full_refresh, + dag=self.dag + ) # def build_dag(self): # """Builds the complete DAG, setting up tasks and their dependencies.""" From 40cac6f7ca8e59325a97d9760f1004c126fc1373 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Fri, 7 Mar 2025 13:21:39 -0700 Subject: [PATCH 03/38] added dock strings to all methods --- .../sharefile_to_snowflake_dag_builder.py | 97 +++++++++++++++++-- 1 file changed, 91 insertions(+), 6 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 1f29007..5dafae1 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -16,7 +16,16 @@ class SharefileTransferToSnowflakeDagBuilder: """ - :: + This class is responsible for creating an Apache Airflow DAG that automates the process of transferring + files from ShareFile to Snowflake. It defines the steps to handle file transfers, transformations, and + data loading operations. The DAG is dynamically built based on provided parameters such as file sources + and schedule interval. Each method in this class corresponds to a specific step in the workflow. + + Attributes: + dag_id (str): The ID for the Airflow DAG. + airflow_default_args (dict): Default arguments passed to the DAG. + file_sources (list): List of file sources to be processed. + schedule_interval (str or None): The schedule interval for the DAG. """ def __init__(self, dag_id, @@ -33,6 +42,15 @@ def __init__(self, self._initialize_dag() def _initialize_dag(self): + """ + Initializes the DAG with the provided configuration and sets up the parameters. + + This method creates a DAG object and configures its parameters, including the list of file sources. + The file sources are passed as an Airflow parameter, allowing the workflow to be flexible in handling + multiple sources. + + This is an internal method used to initialize the DAG. + """ params = { "file_sources": Param( default=list(self.file_sources, {}), @@ -51,7 +69,18 @@ def _initialize_dag(self): ) def check_if_file_in_params(self, file): - """Checks if the file exists in the provided parameters.""" + """ + Checks if the specified file exists in the provided Airflow parameters list. + + This method creates a PythonOperator task that calls a custom function `skip_if_not_in_params_list` + to check if the given file exists in the parameters defined in the DAG. + + Args: + file (str): The name of the file to check in the parameters. + + Returns: + PythonOperator: The Airflow task that checks if the file is in the parameters list. + """ return PythonOperator( task_id=f"check_{file}", python_callable=skip_if_not_in_params_list, @@ -64,7 +93,23 @@ def check_if_file_in_params(self, file): def transfer_sharefile_to_disk(self, file, sharefile_conn_id, sharefile_path, local_path, delete_remote): - """Transfers file from ShareFile to local disk.""" + """ + Transfers a file from ShareFile to a local disk. + + This method creates a SharefileToDiskOperator task that handles the actual transfer of a file from + ShareFile to a specified local disk location. The file is downloaded from ShareFile and saved + locally, and the option to delete the remote file after transfer is provided. + + Args: + file (str): The name of the file to transfer. + sharefile_conn_id (str): The Airflow connection ID for ShareFile. + sharefile_path (str): The file path in ShareFile from where the file will be fetched. + local_path (str): The local directory path where the file will be saved. + delete_remote (bool): Flag indicating whether to delete the remote file after transfer. + + Returns: + SharefileToDiskOperator: The Airflow task to transfer the file from ShareFile to local disk. + """ return SharefileToDiskOperator( task_id=f"transfer_{file}_to_disk", sharefile_conn_id=sharefile_conn_id, @@ -75,7 +120,21 @@ def transfer_sharefile_to_disk(self, file, sharefile_conn_id, ) def transform_to_jsonl(self, file, local_path, delete_csv): - """Transforms CSV to JSONL.""" + """ + Transforms a CSV file to JSONL format. + + This method creates a PythonOperator task to invoke a transformation function that converts a CSV + file into JSONL format. The option to delete the original CSV file after transformation is also + supported. + + Args: + file (str): The name of the file to transform. + local_path (str): The local file path of the CSV file to be transformed. + delete_csv (bool): Flag indicating whether to delete the CSV file after transformation. + + Returns: + PythonOperator: The Airflow task that transforms the CSV to JSONL. + """ return PythonOperator( task_id=f"transform_{file}_to_jsonl", python_callable=jsonl.translate_csv_file_to_jsonl, @@ -89,7 +148,18 @@ def transform_to_jsonl(self, file, local_path, delete_csv): ) def transfer_disk_to_s3(self, file): - """Transfers file from local disk to S3.""" + """ + Transfers a file from local disk to Amazon S3. + + This method creates a PythonOperator task that uploads a file from the local disk to an S3 bucket. + The file will be stored in a specific location defined by the S3 destination key. + + Args: + file (str): The name of the file to transfer. + + Returns: + PythonOperator: The Airflow task to transfer the file from local disk to S3. + """ return PythonOperator( task_id=f"transfer_{file}_to_s3", python_callable=s3.local_filepath_to_s3, @@ -103,7 +173,22 @@ def transfer_disk_to_s3(self, file): def transfer_s3_to_snowflake(self, file, snowflake_conn_id, database, schema, full_refresh): - """Transfers file from S3 to Snowflake.""" + """ + Transfers a file from Amazon S3 to Snowflake. + + This method creates an S3ToSnowflakeOperator task that loads a file from S3 into a Snowflake table. + It supports full refresh functionality, which replaces the existing data with the new data. + + Args: + file (str): The name of the file to transfer. + snowflake_conn_id (str): The Airflow connection ID for Snowflake. + database (str): The Snowflake database where the data will be loaded. + schema (str): The Snowflake schema where the data will be loaded. + full_refresh (bool): Flag to determine whether to perform a full refresh of the data. + + Returns: + S3ToSnowflakeOperator: The Airflow task to transfer the file from S3 to Snowflake. + """ return S3ToSnowflakeOperator( task_id=f"{file}_s3_to_snowflake", snowflake_conn_id=snowflake_conn_id, From d550d0c4ed30380dc917ecf588f76cb96b2a9b62 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Sun, 9 Mar 2025 21:47:19 -0600 Subject: [PATCH 04/38] updated local path organization --- .../sharefile_to_snowflake_dag_builder.py | 74 ++++--------------- 1 file changed, 14 insertions(+), 60 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 5dafae1..d12f67c 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -4,9 +4,7 @@ from airflow.models import Param from airflow.operators.python import PythonOperator -from util import io_helpers - -from ea_airflow_util.callables.airflow import skip_if_not_in_params_list, xcom_pull_template +from ea_airflow_util.callables.airflow import skip_if_not_in_params_list from ea_airflow_util.callables import jsonl, snowflake from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator from edu_edfi_airflow.callables import s3 @@ -14,12 +12,11 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook + class SharefileTransferToSnowflakeDagBuilder: """ This class is responsible for creating an Apache Airflow DAG that automates the process of transferring - files from ShareFile to Snowflake. It defines the steps to handle file transfers, transformations, and - data loading operations. The DAG is dynamically built based on provided parameters such as file sources - and schedule interval. Each method in this class corresponds to a specific step in the workflow. + files from ShareFile to Snowflake. Attributes: dag_id (str): The ID for the Airflow DAG. @@ -45,16 +42,12 @@ def _initialize_dag(self): """ Initializes the DAG with the provided configuration and sets up the parameters. - This method creates a DAG object and configures its parameters, including the list of file sources. - The file sources are passed as an Airflow parameter, allowing the workflow to be flexible in handling - multiple sources. - This is an internal method used to initialize the DAG. """ params = { "file_sources": Param( - default=list(self.file_sources, {}), - examples=list(self.file_sources, {}), + default=list(self.file_sources), + examples=list(self.file_sources), type="list", description="Newline-separated list of file sources to pull from ShareFile", ), @@ -72,9 +65,6 @@ def check_if_file_in_params(self, file): """ Checks if the specified file exists in the provided Airflow parameters list. - This method creates a PythonOperator task that calls a custom function `skip_if_not_in_params_list` - to check if the given file exists in the parameters defined in the DAG. - Args: file (str): The name of the file to check in the parameters. @@ -96,10 +86,6 @@ def transfer_sharefile_to_disk(self, file, sharefile_conn_id, """ Transfers a file from ShareFile to a local disk. - This method creates a SharefileToDiskOperator task that handles the actual transfer of a file from - ShareFile to a specified local disk location. The file is downloaded from ShareFile and saved - locally, and the option to delete the remote file after transfer is provided. - Args: file (str): The name of the file to transfer. sharefile_conn_id (str): The Airflow connection ID for ShareFile. @@ -123,10 +109,6 @@ def transform_to_jsonl(self, file, local_path, delete_csv): """ Transforms a CSV file to JSONL format. - This method creates a PythonOperator task to invoke a transformation function that converts a CSV - file into JSONL format. The option to delete the original CSV file after transformation is also - supported. - Args: file (str): The name of the file to transform. local_path (str): The local file path of the CSV file to be transformed. @@ -147,15 +129,14 @@ def transform_to_jsonl(self, file, local_path, delete_csv): dag=self.dag ) - def transfer_disk_to_s3(self, file): + def transfer_disk_to_s3(self, file, local_path, s3_conn_id): """ Transfers a file from local disk to Amazon S3. - This method creates a PythonOperator task that uploads a file from the local disk to an S3 bucket. - The file will be stored in a specific location defined by the S3 destination key. - Args: file (str): The name of the file to transfer. + local_path (str): The local directory path where the files are saved. + s3_conn_id (str): The Airflow connection ID for AWS S3. Returns: PythonOperator: The Airflow task to transfer the file from local disk to S3. @@ -164,21 +145,18 @@ def transfer_disk_to_s3(self, file): task_id=f"transfer_{file}_to_s3", python_callable=s3.local_filepath_to_s3, op_kwargs={ - 'local_filepath': self._generate_file_path(file, 'disk'), + 'local_filepath': local_path, 's3_destination_key': f"ea_research/{file}", - 's3_conn_id': self.dag_configs['s3_conn_id'] + 's3_conn_id': s3_conn_id }, dag=self.dag ) - def transfer_s3_to_snowflake(self, file, snowflake_conn_id, database, schema, - full_refresh): + def transfer_s3_to_snowflake(self, file, snowflake_conn_id, database, + schema, s3_destination_key, full_refresh): """ Transfers a file from Amazon S3 to Snowflake. - This method creates an S3ToSnowflakeOperator task that loads a file from S3 into a Snowflake table. - It supports full refresh functionality, which replaces the existing data with the new data. - Args: file (str): The name of the file to transfer. snowflake_conn_id (str): The Airflow connection ID for Snowflake. @@ -195,31 +173,7 @@ def transfer_s3_to_snowflake(self, file, snowflake_conn_id, database, schema, database=database, schema=schema, table_name=file, - s3_destination_key=f"ea_research/{file}", + s3_destination_key=s3_destination_key, full_refresh=full_refresh, dag=self.dag - ) - - # def build_dag(self): - # """Builds the complete DAG, setting up tasks and their dependencies.""" - # for file, file_configs in self.file_sources_dict.get(self.run_type, {}).items(): - # # Task for checking if file is in parameters list - # check_task = self._check_if_file_in_params(file) - - # # Task for transferring from ShareFile to Disk - # transfer_task = self._transfer_sharefile_to_disk(file, file_configs) - - # # Task for transforming from CSV to JSONL - # transform_task = self._transform_to_jsonl(transfer_task, file) - - # # Task for transferring from Disk to S3 - # transfer_to_s3_task = self._transfer_disk_to_s3(file) - - # # Task for transferring from S3 to Snowflake - # s3_to_snowflake_task = self._transfer_s3_to_snowflake(file) - - # # Set task dependencies in a linear flow - # check_task >> transfer_task >> transform_task >> transfer_to_s3_task >> s3_to_snowflake_task - - # return self.dag - + ) \ No newline at end of file From 36d965198f3667e61afd2c3855b87daf521e4076 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Sun, 9 Mar 2025 22:26:25 -0600 Subject: [PATCH 05/38] updated way that dag is initialized --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index d12f67c..b4c0d43 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -36,9 +36,9 @@ def __init__(self, self.schedule_interval = schedule_interval self.dag = None - self._initialize_dag() + # self._initialize_dag() - def _initialize_dag(self): + def initialize_dag(self): """ Initializes the DAG with the provided configuration and sets up the parameters. From 7c4734f6ee0981efeaeb91e413399bdebb2a4027 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Sun, 9 Mar 2025 22:35:51 -0600 Subject: [PATCH 06/38] added global to dag --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index b4c0d43..6c5367f 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -36,9 +36,9 @@ def __init__(self, self.schedule_interval = schedule_interval self.dag = None - # self._initialize_dag() + self._initialize_dag() - def initialize_dag(self): + def _initialize_dag(self): """ Initializes the DAG with the provided configuration and sets up the parameters. @@ -61,6 +61,8 @@ def initialize_dag(self): catchup=False, ) + globals()[self.dag.dag_id] = self.dag + def check_if_file_in_params(self, file): """ Checks if the specified file exists in the provided Airflow parameters list. From 5d590b902417fcdd1eebe06a42b419882cc00cc0 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Sun, 9 Mar 2025 23:12:14 -0600 Subject: [PATCH 07/38] updated dag call structure --- .../sharefile_to_snowflake_dag_builder.py | 47 +++++++------------ 1 file changed, 17 insertions(+), 30 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 6c5367f..1cebccb 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -24,44 +24,29 @@ class SharefileTransferToSnowflakeDagBuilder: file_sources (list): List of file sources to be processed. schedule_interval (str or None): The schedule interval for the DAG. """ + + params_dict = { + "file_sources": Param( + default=list(self.file_sources), + examples=list(self.file_sources), + type="list", + description="Newline-separated list of file sources to pull from ShareFile", + ), + } + def __init__(self, dag_id, airflow_default_args, file_sources, - schedule_interval = None + schedule_interval = None, + **kwargs ): self.dag_id = dag_id self.airflow_default_args = airflow_default_args self.file_sources = file_sources self.schedule_interval = schedule_interval - self.dag = None - self._initialize_dag() - - def _initialize_dag(self): - """ - Initializes the DAG with the provided configuration and sets up the parameters. - - This is an internal method used to initialize the DAG. - """ - params = { - "file_sources": Param( - default=list(self.file_sources), - examples=list(self.file_sources), - type="list", - description="Newline-separated list of file sources to pull from ShareFile", - ), - } - - self.dag = DAG( - dag_id=self.dag_id, - default_args=self.airflow_default_args, - schedule_interval=self.schedule_interval, - params=params, - catchup=False, - ) - - globals()[self.dag.dag_id] = self.dag + self.dag = Dag(params=self.params_dict, **kwargs) def check_if_file_in_params(self, file): """ @@ -107,7 +92,8 @@ def transfer_sharefile_to_disk(self, file, sharefile_conn_id, dag=self.dag ) - def transform_to_jsonl(self, file, local_path, delete_csv): + def transform_to_jsonl(self, file, local_path, delete_csv + ) -> PythonOperator: """ Transforms a CSV file to JSONL format. @@ -131,7 +117,8 @@ def transform_to_jsonl(self, file, local_path, delete_csv): dag=self.dag ) - def transfer_disk_to_s3(self, file, local_path, s3_conn_id): + def transfer_disk_to_s3(self, file, local_path, s3_conn_id + ) -> PythonOperator: """ Transfers a file from local disk to Amazon S3. From efb510d86c0f85b9ff3a0d0968f0b65906e5564a Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Sun, 9 Mar 2025 23:25:21 -0600 Subject: [PATCH 08/38] updated way params is called within Dag instantion --- .../sharefile_to_snowflake_dag_builder.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 1cebccb..cf5fd16 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -25,15 +25,6 @@ class SharefileTransferToSnowflakeDagBuilder: schedule_interval (str or None): The schedule interval for the DAG. """ - params_dict = { - "file_sources": Param( - default=list(self.file_sources), - examples=list(self.file_sources), - type="list", - description="Newline-separated list of file sources to pull from ShareFile", - ), - } - def __init__(self, dag_id, airflow_default_args, @@ -46,7 +37,20 @@ def __init__(self, self.file_sources = file_sources self.schedule_interval = schedule_interval - self.dag = Dag(params=self.params_dict, **kwargs) + self.params_dict = { + "file_sources": Param( + default=list(self.file_sources), + examples=list(self.file_sources), + type="list", + description="Newline-separated list of file sources to pull from ShareFile", + ), + } + + self.dag = DAG(dag_id=self.dag_id, + params=self.params_dict, + default_args=self.airflow_default_args, + schedule_interval=self.schedule_interval, + **kwargs) def check_if_file_in_params(self, file): """ From db969c8c1d19e774531e50352c681cb8c8ba0cb1 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Sun, 9 Mar 2025 23:33:00 -0600 Subject: [PATCH 09/38] small update --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index cf5fd16..ccb3407 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -29,8 +29,7 @@ def __init__(self, dag_id, airflow_default_args, file_sources, - schedule_interval = None, - **kwargs + schedule_interval = None ): self.dag_id = dag_id self.airflow_default_args = airflow_default_args @@ -49,8 +48,8 @@ def __init__(self, self.dag = DAG(dag_id=self.dag_id, params=self.params_dict, default_args=self.airflow_default_args, - schedule_interval=self.schedule_interval, - **kwargs) + schedule_interval=self.schedule_interval + ) def check_if_file_in_params(self, file): """ From b335d66586f434bfd4e9c1abac88db2cad9838ed Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Sun, 9 Mar 2025 23:41:12 -0600 Subject: [PATCH 10/38] updated file_sources to include .keys --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index ccb3407..9f8e551 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -38,8 +38,8 @@ def __init__(self, self.params_dict = { "file_sources": Param( - default=list(self.file_sources), - examples=list(self.file_sources), + default=list(self.file_sources.keys()), + examples=list(self.file_sources.keys()), type="list", description="Newline-separated list of file sources to pull from ShareFile", ), From 2fa3399c93c71315c4a94e59f99c55b56497ec4b Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Sun, 9 Mar 2025 23:57:34 -0600 Subject: [PATCH 11/38] updated dag to using eacustomdag --- .../dags/sharefile_to_snowflake_dag_builder.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 9f8e551..b54d586 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -11,6 +11,7 @@ from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from ea_airflow_util import EACustomDAG class SharefileTransferToSnowflakeDagBuilder: @@ -27,13 +28,13 @@ class SharefileTransferToSnowflakeDagBuilder: def __init__(self, dag_id, - airflow_default_args, - file_sources, + airflow_default_args_dict, + file_sources_dict, schedule_interval = None ): self.dag_id = dag_id - self.airflow_default_args = airflow_default_args - self.file_sources = file_sources + self.airflow_default_args = airflow_default_args_dict + self.file_sources = file_sources_dict self.schedule_interval = schedule_interval self.params_dict = { @@ -45,8 +46,8 @@ def __init__(self, ), } - self.dag = DAG(dag_id=self.dag_id, - params=self.params_dict, + self.dag = EACustomDAG(dag_id=self.dag_id, + params=self.params_dict, default_args=self.airflow_default_args, schedule_interval=self.schedule_interval ) From 58a03321f14988b29de3de6091c008bc559257ae Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Mon, 10 Mar 2025 14:04:19 -0500 Subject: [PATCH 12/38] updated param type to array --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index b54d586..65dd3cb 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -41,7 +41,7 @@ def __init__(self, "file_sources": Param( default=list(self.file_sources.keys()), examples=list(self.file_sources.keys()), - type="list", + type="array", description="Newline-separated list of file sources to pull from ShareFile", ), } From 5ec70e41aa0c365ad01905bfe9870484fb38187f Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Tue, 11 Mar 2025 10:56:56 -0500 Subject: [PATCH 13/38] updated sharefile to snowflake dag builder script to include global exposure of dag id --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 65dd3cb..7f058fa 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -51,6 +51,8 @@ def __init__(self, default_args=self.airflow_default_args, schedule_interval=self.schedule_interval ) + + globals()[self.dag_id] = self.dag def check_if_file_in_params(self, file): """ From 929d9ad1765cd9f5e4614c830dc41b6ed51f8628 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Tue, 11 Mar 2025 11:05:51 -0500 Subject: [PATCH 14/38] removed global function because it did not work for explosing dag id --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 7f058fa..65dd3cb 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -51,8 +51,6 @@ def __init__(self, default_args=self.airflow_default_args, schedule_interval=self.schedule_interval ) - - globals()[self.dag_id] = self.dag def check_if_file_in_params(self, file): """ From f7178acc21ad66dd59af6f1a8c6d27dc248e56a9 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Tue, 11 Mar 2025 14:27:46 -0500 Subject: [PATCH 15/38] updated s3.py in attempt to fix metadata_column bug --- ea_airflow_util/providers/aws/operators/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ea_airflow_util/providers/aws/operators/s3.py b/ea_airflow_util/providers/aws/operators/s3.py index 683ad00..d680f20 100644 --- a/ea_airflow_util/providers/aws/operators/s3.py +++ b/ea_airflow_util/providers/aws/operators/s3.py @@ -186,7 +186,7 @@ def execute(self, context): TO_TIMESTAMP(REGEXP_SUBSTR(metadata$filename, '{ts_regex}'), 'YYYYMMDDTHH24MISS') AS pull_timestamp, metadata$file_row_number AS file_row_number, metadata$filename AS filename, - {metadata_columns} + {metadata_columns if metadata_columns else ''} t.$1 AS v FROM @{self.database}.util.airflow_stage/{self.s3_destination_key} (file_format => 'json_default') t From 9ef191fefcab1444d17db6417bb746fe76df478d Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Wed, 12 Mar 2025 06:48:19 -0500 Subject: [PATCH 16/38] updated class so that it would construct required folder structure --- .../sharefile_to_snowflake_dag_builder.py | 40 ++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 65dd3cb..804c844 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -1,4 +1,5 @@ import os +from datetime import datetime from airflow import DAG from airflow.models import Param @@ -51,6 +52,29 @@ def __init__(self, default_args=self.airflow_default_args, schedule_interval=self.schedule_interval ) + + def build_structured_path(self, base_path, file, separator="/"): + """ + Constructs a structured path using the current date and time. + + Args: + base_path (str): The root directory or base key where files should be stored. + file (str): The filename. + separator (str, optional): The separator to use for the path. Defaults to '/' for directories + and can be set to an appropriate separator for other storage systems + (e.g., AWS S3 uses '/'). + + Returns: + str: The fully structured path where the file will be stored. + """ + # Get the execution date and time + execution_date = datetime.now().strftime('%Y%m%d') # e.g., 20250312 + execution_time = datetime.now().strftime('%H%M%S') # e.g., 143500 + + # Construct the full path (e.g., "/tmp/sharefile/20250312/143500/file.csv" OR "s3://bucket/key/20250312/143500/file.csv") + structured_path = f"{base_path}{separator}{execution_date}{separator}{execution_time}{separator}{file}" + + return structured_path def check_if_file_in_params(self, file): """ @@ -73,7 +97,7 @@ def check_if_file_in_params(self, file): ) def transfer_sharefile_to_disk(self, file, sharefile_conn_id, - sharefile_path, local_path, delete_remote): + sharefile_path, base_path, delete_remote): """ Transfers a file from ShareFile to a local disk. @@ -81,17 +105,21 @@ def transfer_sharefile_to_disk(self, file, sharefile_conn_id, file (str): The name of the file to transfer. sharefile_conn_id (str): The Airflow connection ID for ShareFile. sharefile_path (str): The file path in ShareFile from where the file will be fetched. - local_path (str): The local directory path where the file will be saved. + base_local_path (str): The root directory where files should be stored. delete_remote (bool): Flag indicating whether to delete the remote file after transfer. Returns: SharefileToDiskOperator: The Airflow task to transfer the file from ShareFile to local disk. """ + structured_local_path = self.build_structured_path(base_path, file) + + os.makedirs(os.path.dirname(structured_local_path), exist_ok=True) + return SharefileToDiskOperator( task_id=f"transfer_{file}_to_disk", sharefile_conn_id=sharefile_conn_id, sharefile_path=sharefile_path, - local_path=local_path, + local_path=structured_local_path, delete_remote=delete_remote, dag=self.dag ) @@ -121,7 +149,7 @@ def transform_to_jsonl(self, file, local_path, delete_csv dag=self.dag ) - def transfer_disk_to_s3(self, file, local_path, s3_conn_id + def transfer_disk_to_s3(self, file, local_path, base_s3_key, s3_conn_id ) -> PythonOperator: """ Transfers a file from local disk to Amazon S3. @@ -134,12 +162,14 @@ def transfer_disk_to_s3(self, file, local_path, s3_conn_id Returns: PythonOperator: The Airflow task to transfer the file from local disk to S3. """ + structured_s3_key = self.build_structured_path(base_s3_key, file, separator="/") + return PythonOperator( task_id=f"transfer_{file}_to_s3", python_callable=s3.local_filepath_to_s3, op_kwargs={ 'local_filepath': local_path, - 's3_destination_key': f"ea_research/{file}", + 's3_destination_key': structured_s3_key, 's3_conn_id': s3_conn_id }, dag=self.dag From 111dc3beeb4c51840d1c4d0b5b9ab7499fd13984 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Wed, 12 Mar 2025 06:56:02 -0500 Subject: [PATCH 17/38] updated input variable name --- .../dags/sharefile_to_snowflake_dag_builder.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 804c844..53762fa 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -68,8 +68,8 @@ def build_structured_path(self, base_path, file, separator="/"): str: The fully structured path where the file will be stored. """ # Get the execution date and time - execution_date = datetime.now().strftime('%Y%m%d') # e.g., 20250312 - execution_time = datetime.now().strftime('%H%M%S') # e.g., 143500 + execution_date = datetime.now().strftime('%Y%m%d') + execution_time = datetime.now().strftime('%H%M%S') # Construct the full path (e.g., "/tmp/sharefile/20250312/143500/file.csv" OR "s3://bucket/key/20250312/143500/file.csv") structured_path = f"{base_path}{separator}{execution_date}{separator}{execution_time}{separator}{file}" @@ -97,7 +97,7 @@ def check_if_file_in_params(self, file): ) def transfer_sharefile_to_disk(self, file, sharefile_conn_id, - sharefile_path, base_path, delete_remote): + sharefile_path, local_base_path, delete_remote): """ Transfers a file from ShareFile to a local disk. @@ -111,7 +111,7 @@ def transfer_sharefile_to_disk(self, file, sharefile_conn_id, Returns: SharefileToDiskOperator: The Airflow task to transfer the file from ShareFile to local disk. """ - structured_local_path = self.build_structured_path(base_path, file) + structured_local_path = self.build_structured_path(local_base_path, file) os.makedirs(os.path.dirname(structured_local_path), exist_ok=True) From 248f4d6990278f42721951374b88e23acee27d26 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Wed, 12 Mar 2025 07:19:28 -0500 Subject: [PATCH 18/38] updated way that local path is structured --- .../dags/sharefile_to_snowflake_dag_builder.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 53762fa..945c8db 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -149,7 +149,7 @@ def transform_to_jsonl(self, file, local_path, delete_csv dag=self.dag ) - def transfer_disk_to_s3(self, file, local_path, base_s3_key, s3_conn_id + def transfer_disk_to_s3(self, file, local_path, base_s3_destination_key, s3_conn_id ) -> PythonOperator: """ Transfers a file from local disk to Amazon S3. @@ -162,7 +162,7 @@ def transfer_disk_to_s3(self, file, local_path, base_s3_key, s3_conn_id Returns: PythonOperator: The Airflow task to transfer the file from local disk to S3. """ - structured_s3_key = self.build_structured_path(base_s3_key, file, separator="/") + structured_s3_key = self.build_structured_path(base_s3_destination_key, file, separator="/") return PythonOperator( task_id=f"transfer_{file}_to_s3", @@ -176,7 +176,7 @@ def transfer_disk_to_s3(self, file, local_path, base_s3_key, s3_conn_id ) def transfer_s3_to_snowflake(self, file, snowflake_conn_id, database, - schema, s3_destination_key, full_refresh): + schema, base_s3_destination_key, full_refresh): """ Transfers a file from Amazon S3 to Snowflake. @@ -196,7 +196,7 @@ def transfer_s3_to_snowflake(self, file, snowflake_conn_id, database, database=database, schema=schema, table_name=file, - s3_destination_key=s3_destination_key, + s3_destination_key=base_s3_destination_key, full_refresh=full_refresh, dag=self.dag ) \ No newline at end of file From 89a340b4bc98086f6bf97de8a7612ad51ba99fc0 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Wed, 12 Mar 2025 08:42:27 -0500 Subject: [PATCH 19/38] updated where date and timestamp are instantiated --- .../dags/sharefile_to_snowflake_dag_builder.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 945c8db..e07357a 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -52,10 +52,13 @@ def __init__(self, default_args=self.airflow_default_args, schedule_interval=self.schedule_interval ) + + self.pull_date = datetime.now().strftime('%Y%m%d') + self.pull_timestamp = datetime.now().strftime('%Y%m%dT%H%M%S') def build_structured_path(self, base_path, file, separator="/"): """ - Constructs a structured path using the current date and time. + Constructs a structured path using the current date and timestamp. Args: base_path (str): The root directory or base key where files should be stored. @@ -68,11 +71,10 @@ def build_structured_path(self, base_path, file, separator="/"): str: The fully structured path where the file will be stored. """ # Get the execution date and time - execution_date = datetime.now().strftime('%Y%m%d') - execution_time = datetime.now().strftime('%H%M%S') + # Construct the full path (e.g., "/tmp/sharefile/20250312/143500/file.csv" OR "s3://bucket/key/20250312/143500/file.csv") - structured_path = f"{base_path}{separator}{execution_date}{separator}{execution_time}{separator}{file}" + structured_path = f"{base_path}{separator}{self.pull_date}{separator}{self.pull_timestamp}{separator}{file}" return structured_path From 85f91ebf582bbb1d126e0f86aaffaf144c9a2c9c Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Wed, 12 Mar 2025 08:58:28 -0500 Subject: [PATCH 20/38] updated timestamp code to use airflow runtime instead of datetime now method --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index e07357a..a68e8ec 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -53,8 +53,8 @@ def __init__(self, schedule_interval=self.schedule_interval ) - self.pull_date = datetime.now().strftime('%Y%m%d') - self.pull_timestamp = datetime.now().strftime('%Y%m%dT%H%M%S') + self.pull_date = "{{ ds_nodash }}" + self.pull_timestamp = "{{ ts_nodash }}" def build_structured_path(self, base_path, file, separator="/"): """ @@ -70,7 +70,6 @@ def build_structured_path(self, base_path, file, separator="/"): Returns: str: The fully structured path where the file will be stored. """ - # Get the execution date and time # Construct the full path (e.g., "/tmp/sharefile/20250312/143500/file.csv" OR "s3://bucket/key/20250312/143500/file.csv") From 9882e64f0ac2e81a4df8dada8f1aac6f5d9a798b Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Wed, 12 Mar 2025 09:02:19 -0500 Subject: [PATCH 21/38] updated timestamp to go back to original form --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index a68e8ec..1d3b064 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -53,8 +53,8 @@ def __init__(self, schedule_interval=self.schedule_interval ) - self.pull_date = "{{ ds_nodash }}" - self.pull_timestamp = "{{ ts_nodash }}" + self.pull_date = datetime.now().strftime('%Y%m%d') + self.pull_timestamp = datetime.now().strftime('%Y%m%dT%H%M%S') def build_structured_path(self, base_path, file, separator="/"): """ From 9644caf7399026a6320f551d490fe491a63c0f55 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Wed, 12 Mar 2025 09:30:13 -0500 Subject: [PATCH 22/38] updated way filepath is created --- .../dags/sharefile_to_snowflake_dag_builder.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 1d3b064..0bca18c 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -73,7 +73,7 @@ def build_structured_path(self, base_path, file, separator="/"): # Construct the full path (e.g., "/tmp/sharefile/20250312/143500/file.csv" OR "s3://bucket/key/20250312/143500/file.csv") - structured_path = f"{base_path}{separator}{self.pull_date}{separator}{self.pull_timestamp}{separator}{file}" + structured_path = f"{base_path}{separator}{'ds_nodash'}{separator}{'ts_nodash'}{separator}{file}" return structured_path @@ -112,15 +112,16 @@ def transfer_sharefile_to_disk(self, file, sharefile_conn_id, Returns: SharefileToDiskOperator: The Airflow task to transfer the file from ShareFile to local disk. """ - structured_local_path = self.build_structured_path(local_base_path, file) + # structured_local_path = self.build_structured_path(local_base_path, file) - os.makedirs(os.path.dirname(structured_local_path), exist_ok=True) + # os.makedirs(os.path.dirname(structured_local_path), exist_ok=True) return SharefileToDiskOperator( task_id=f"transfer_{file}_to_disk", sharefile_conn_id=sharefile_conn_id, sharefile_path=sharefile_path, - local_path=structured_local_path, + # local_path=structured_local_path, + local_path=os.path.join(local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file), delete_remote=delete_remote, dag=self.dag ) @@ -163,14 +164,14 @@ def transfer_disk_to_s3(self, file, local_path, base_s3_destination_key, s3_conn Returns: PythonOperator: The Airflow task to transfer the file from local disk to S3. """ - structured_s3_key = self.build_structured_path(base_s3_destination_key, file, separator="/") + # structured_s3_key = self.build_structured_path(base_s3_destination_key, file, separator="/") return PythonOperator( task_id=f"transfer_{file}_to_s3", python_callable=s3.local_filepath_to_s3, op_kwargs={ 'local_filepath': local_path, - 's3_destination_key': structured_s3_key, + 's3_destination_key': f"{base_s3_destination_key}/" + '{{ds_nodash}}' + "/" + '{{ts_nodash}}' + f"/{file}", 's3_conn_id': s3_conn_id }, dag=self.dag From 74e8615bbea91aa04769abf1f8307ac1491b927f Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Wed, 12 Mar 2025 13:10:07 -0500 Subject: [PATCH 23/38] save final changes to sharefile to snowflake dag before complete refactor --- .../sharefile_to_snowflake_dag_builder.py | 25 +++---------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 0bca18c..4d4ed1f 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -55,27 +55,6 @@ def __init__(self, self.pull_date = datetime.now().strftime('%Y%m%d') self.pull_timestamp = datetime.now().strftime('%Y%m%dT%H%M%S') - - def build_structured_path(self, base_path, file, separator="/"): - """ - Constructs a structured path using the current date and timestamp. - - Args: - base_path (str): The root directory or base key where files should be stored. - file (str): The filename. - separator (str, optional): The separator to use for the path. Defaults to '/' for directories - and can be set to an appropriate separator for other storage systems - (e.g., AWS S3 uses '/'). - - Returns: - str: The fully structured path where the file will be stored. - """ - - - # Construct the full path (e.g., "/tmp/sharefile/20250312/143500/file.csv" OR "s3://bucket/key/20250312/143500/file.csv") - structured_path = f"{base_path}{separator}{'ds_nodash'}{separator}{'ts_nodash'}{separator}{file}" - - return structured_path def check_if_file_in_params(self, file): """ @@ -201,4 +180,6 @@ def transfer_s3_to_snowflake(self, file, snowflake_conn_id, database, s3_destination_key=base_s3_destination_key, full_refresh=full_refresh, dag=self.dag - ) \ No newline at end of file + ) + + \ No newline at end of file From 01e84386ec9c9cd548ccfdbb71ef52f45bd296c4 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Wed, 12 Mar 2025 15:52:05 -0500 Subject: [PATCH 24/38] updated sharefile to snowflake dag builder class to remove unecessary wrappers for each python operator and python callable --- .../sharefile_to_snowflake_dag_builder.py | 252 ++++++++---------- 1 file changed, 111 insertions(+), 141 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 4d4ed1f..dfcea3b 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -10,6 +10,7 @@ from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator from edu_edfi_airflow.callables import s3 from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator +from ea_airflow_util.callables.airflow import xcom_pull_template from airflow.providers.amazon.aws.hooks.s3 import S3Hook from ea_airflow_util import EACustomDAG @@ -28,19 +29,54 @@ class SharefileTransferToSnowflakeDagBuilder: """ def __init__(self, - dag_id, - airflow_default_args_dict, - file_sources_dict, - schedule_interval = None + dag_id: str, + airflow_default_args: dict, + file_sources: dict, + schedule_interval = None, + + local_base_path: str, + transform_csv_to_jsonl: bool = False, + + sharefile_path: str, + sharefile_conn_id: str, + delete_remote: bool = False, + + delete_local_csv: bool = False, + + base_s3_destination_key: str, + s3_conn_id: str, + + snowflake_conn_id: str, + database: str, + schema: str, + full_refresh: bool, + ): self.dag_id = dag_id - self.airflow_default_args = airflow_default_args_dict - self.file_sources = file_sources_dict + self.airflow_default_args = airflow_default_args + self.file_sources = file_sources self.schedule_interval = schedule_interval + self.local_base_path = local_base_path + self.transform_csv_to_jsonl = transform_csv_to_jsonl + + self.sharefile_path = sharefile_path + self.sharefile_conn_id = sharefile_conn_id + self.delete_remote = delete_remote + + self.delete_local_csv = delete_local_csv + + self.base_s3_destination_key = base_s3_destination_key + self.s3_conn_id = s3_conn_id + + self.snowflake_conn_id = snowflake_conn_id + self.database = database + self.schema = schema + self.full_refresh = full_refresh + self.params_dict = { "file_sources": Param( - default=list(self.file_sources.keys()), + default=list(self.file_sources.keys()) if self.file_sources else [], examples=list(self.file_sources.keys()), type="array", description="Newline-separated list of file sources to pull from ShareFile", @@ -48,138 +84,72 @@ def __init__(self, } self.dag = EACustomDAG(dag_id=self.dag_id, - params=self.params_dict, - default_args=self.airflow_default_args, - schedule_interval=self.schedule_interval - ) - - self.pull_date = datetime.now().strftime('%Y%m%d') - self.pull_timestamp = datetime.now().strftime('%Y%m%dT%H%M%S') - - def check_if_file_in_params(self, file): - """ - Checks if the specified file exists in the provided Airflow parameters list. - - Args: - file (str): The name of the file to check in the parameters. - - Returns: - PythonOperator: The Airflow task that checks if the file is in the parameters list. - """ - return PythonOperator( - task_id=f"check_{file}", - python_callable=skip_if_not_in_params_list, - op_kwargs={ - 'param_name': "file_sources", - 'value': file + default_args=self.airflow_default_args, + schedule_interval=self.schedule_interval, + params=self.params_dict, + catchup=False + ) + + def build_sharefile_to_snowflake_dag(self, **kwargs): + + for file in self.file_sources.keys(): + + check_if_file_in_param = PythonOperator( + task_id=f"check_{file}", + python_callable=skip_if_not_in_params_list, + op_kwargs={ + 'param_name': "file_sources", + 'value': file + }, + dag=self.dag + ) + + transfer_sharefile_to_disk = SharefileToDiskOperator( + task_id=f"transfer_{file}_to_disk", + sharefile_conn_id=self.sharefile_conn_id, + sharefile_path=self.sharefile_path, + local_path=os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file), + delete_remote=self.delete_remote, + dag=self.dag + ) + + transform_to_jsonl = PythonOperator( + task_id=f"transform_{file}_to_jsonl", + python_callable=jsonl.translate_csv_file_to_jsonl, + op_kwargs={ + 'local_path': xcom_pull_template(transfer_sharefile_to_disk), + 'output_path': None, + 'delete_csv': self.delete_local_csv, + 'metadata_dict': {'file_source': file}, + }, + dag=self.dag + ) + + transfer_disk_to_s3 = PythonOperator( + task_id=f"transfer_{file}_to_s3", + python_callable=s3.local_filepath_to_s3, + op_kwargs={ + 'local_filepath': xcom_pull_template(transfer_sharefile_to_disk), + 's3_destination_key': f"{self.base_s3_destination_key}/" + '{{ds_nodash}}' + "/" + '{{ts_nodash}}' + f"/{file}", + 's3_conn_id': self.s3_conn_id }, - dag=self.dag - ) - - def transfer_sharefile_to_disk(self, file, sharefile_conn_id, - sharefile_path, local_base_path, delete_remote): - """ - Transfers a file from ShareFile to a local disk. - - Args: - file (str): The name of the file to transfer. - sharefile_conn_id (str): The Airflow connection ID for ShareFile. - sharefile_path (str): The file path in ShareFile from where the file will be fetched. - base_local_path (str): The root directory where files should be stored. - delete_remote (bool): Flag indicating whether to delete the remote file after transfer. - - Returns: - SharefileToDiskOperator: The Airflow task to transfer the file from ShareFile to local disk. - """ - # structured_local_path = self.build_structured_path(local_base_path, file) - - # os.makedirs(os.path.dirname(structured_local_path), exist_ok=True) - - return SharefileToDiskOperator( - task_id=f"transfer_{file}_to_disk", - sharefile_conn_id=sharefile_conn_id, - sharefile_path=sharefile_path, - # local_path=structured_local_path, - local_path=os.path.join(local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file), - delete_remote=delete_remote, - dag=self.dag - ) - - def transform_to_jsonl(self, file, local_path, delete_csv - ) -> PythonOperator: - """ - Transforms a CSV file to JSONL format. - - Args: - file (str): The name of the file to transform. - local_path (str): The local file path of the CSV file to be transformed. - delete_csv (bool): Flag indicating whether to delete the CSV file after transformation. - - Returns: - PythonOperator: The Airflow task that transforms the CSV to JSONL. - """ - return PythonOperator( - task_id=f"transform_{file}_to_jsonl", - python_callable=jsonl.translate_csv_file_to_jsonl, - op_kwargs={ - 'local_path': local_path, - 'output_path': None, - 'delete_csv': delete_csv, - 'metadata_dict': {'file_source': file}, - }, - dag=self.dag - ) - - def transfer_disk_to_s3(self, file, local_path, base_s3_destination_key, s3_conn_id - ) -> PythonOperator: - """ - Transfers a file from local disk to Amazon S3. - - Args: - file (str): The name of the file to transfer. - local_path (str): The local directory path where the files are saved. - s3_conn_id (str): The Airflow connection ID for AWS S3. - - Returns: - PythonOperator: The Airflow task to transfer the file from local disk to S3. - """ - # structured_s3_key = self.build_structured_path(base_s3_destination_key, file, separator="/") - - return PythonOperator( - task_id=f"transfer_{file}_to_s3", - python_callable=s3.local_filepath_to_s3, - op_kwargs={ - 'local_filepath': local_path, - 's3_destination_key': f"{base_s3_destination_key}/" + '{{ds_nodash}}' + "/" + '{{ts_nodash}}' + f"/{file}", - 's3_conn_id': s3_conn_id - }, - dag=self.dag - ) - - def transfer_s3_to_snowflake(self, file, snowflake_conn_id, database, - schema, base_s3_destination_key, full_refresh): - """ - Transfers a file from Amazon S3 to Snowflake. - - Args: - file (str): The name of the file to transfer. - snowflake_conn_id (str): The Airflow connection ID for Snowflake. - database (str): The Snowflake database where the data will be loaded. - schema (str): The Snowflake schema where the data will be loaded. - full_refresh (bool): Flag to determine whether to perform a full refresh of the data. - - Returns: - S3ToSnowflakeOperator: The Airflow task to transfer the file from S3 to Snowflake. - """ - return S3ToSnowflakeOperator( - task_id=f"{file}_s3_to_snowflake", - snowflake_conn_id=snowflake_conn_id, - database=database, - schema=schema, - table_name=file, - s3_destination_key=base_s3_destination_key, - full_refresh=full_refresh, - dag=self.dag - ) - - \ No newline at end of file + dag=self.dag + ) + + transfer_s3_to_snowflake = S3ToSnowflakeOperator( + task_id=f"{file}_s3_to_snowflake", + snowflake_conn_id=self.snowflake_conn_id, + database=self.database, + schema=self.schema, + table_name=file, + s3_destination_key=self.base_s3_destination_key, + full_refresh=self.full_refresh, + dag=self.dag + ) + + if self.transform_csv_to_jsonl: + check_if_file_in_param >> transfer_sharefile_to_disk >> transform_to_jsonl >> transfer_disk_to_s3 >> transfer_s3_to_snowflake + else: + check_if_file_in_param >> transfer_sharefile_to_disk >> transfer_disk_to_s3 >> transfer_s3_to_snowflake + + return self.dag \ No newline at end of file From 9a90112575d461ab0fa4f45758c6a3e2ac8875dd Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Thu, 13 Mar 2025 16:15:32 -0500 Subject: [PATCH 25/38] updated sharefile sources --- ea_airflow_util/dags/s3_to_snowflake_dag.py | 2 +- .../dags/sharefile_to_snowflake_dag_builder.py | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/ea_airflow_util/dags/s3_to_snowflake_dag.py b/ea_airflow_util/dags/s3_to_snowflake_dag.py index a9e8c71..9513c58 100644 --- a/ea_airflow_util/dags/s3_to_snowflake_dag.py +++ b/ea_airflow_util/dags/s3_to_snowflake_dag.py @@ -36,7 +36,7 @@ def __init__(self, is_manual_upload: bool = False, pool: str, - full_replace: bool = False, #TODO once on latest version of airflow, use dagrun parameter to allow full_replace runs even if not set here at dag level + full_replace: bool = False, do_delete_from_source: bool = True, **kwargs diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index dfcea3b..43f6e87 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -37,7 +37,7 @@ def __init__(self, local_base_path: str, transform_csv_to_jsonl: bool = False, - sharefile_path: str, + # sharefile_path: str, sharefile_conn_id: str, delete_remote: bool = False, @@ -50,8 +50,7 @@ def __init__(self, database: str, schema: str, full_refresh: bool, - - ): + ): self.dag_id = dag_id self.airflow_default_args = airflow_default_args self.file_sources = file_sources @@ -60,7 +59,7 @@ def __init__(self, self.local_base_path = local_base_path self.transform_csv_to_jsonl = transform_csv_to_jsonl - self.sharefile_path = sharefile_path + # self.sharefile_path = sharefile_path self.sharefile_conn_id = sharefile_conn_id self.delete_remote = delete_remote @@ -80,8 +79,8 @@ def __init__(self, examples=list(self.file_sources.keys()), type="array", description="Newline-separated list of file sources to pull from ShareFile", - ), - } + ), + } self.dag = EACustomDAG(dag_id=self.dag_id, default_args=self.airflow_default_args, @@ -92,7 +91,7 @@ def __init__(self, def build_sharefile_to_snowflake_dag(self, **kwargs): - for file in self.file_sources.keys(): + for file, details in self.file_sources.items(): check_if_file_in_param = PythonOperator( task_id=f"check_{file}", @@ -107,7 +106,7 @@ def build_sharefile_to_snowflake_dag(self, **kwargs): transfer_sharefile_to_disk = SharefileToDiskOperator( task_id=f"transfer_{file}_to_disk", sharefile_conn_id=self.sharefile_conn_id, - sharefile_path=self.sharefile_path, + sharefile_path=details['sharefile_path'], local_path=os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file), delete_remote=self.delete_remote, dag=self.dag From bd8408ea26607c10abc4b0a309dfcee19f19d934 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Fri, 14 Mar 2025 11:41:57 -0500 Subject: [PATCH 26/38] re-ordered init arguments --- .../sharefile_to_snowflake_dag_builder.py | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 43f6e87..7786c6c 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -1,5 +1,6 @@ -import os from datetime import datetime +import logging +import os from airflow import DAG from airflow.models import Param @@ -15,6 +16,10 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook from ea_airflow_util import EACustomDAG +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) class SharefileTransferToSnowflakeDagBuilder: """ @@ -27,29 +32,22 @@ class SharefileTransferToSnowflakeDagBuilder: file_sources (list): List of file sources to be processed. schedule_interval (str or None): The schedule interval for the DAG. """ - def __init__(self, dag_id: str, airflow_default_args: dict, file_sources: dict, - schedule_interval = None, - local_base_path: str, - transform_csv_to_jsonl: bool = False, - - # sharefile_path: str, sharefile_conn_id: str, - delete_remote: bool = False, - - delete_local_csv: bool = False, - base_s3_destination_key: str, s3_conn_id: str, - snowflake_conn_id: str, database: str, schema: str, full_refresh: bool, + schedule_interval = None, + transform_csv_to_jsonl: bool = False, + delete_remote: bool = False, + delete_local_csv: bool = False ): self.dag_id = dag_id self.airflow_default_args = airflow_default_args @@ -59,7 +57,6 @@ def __init__(self, self.local_base_path = local_base_path self.transform_csv_to_jsonl = transform_csv_to_jsonl - # self.sharefile_path = sharefile_path self.sharefile_conn_id = sharefile_conn_id self.delete_remote = delete_remote @@ -73,6 +70,8 @@ def __init__(self, self.schema = schema self.full_refresh = full_refresh + self.logger = logging.getLogger(self.__class__.__name__) + self.params_dict = { "file_sources": Param( default=list(self.file_sources.keys()) if self.file_sources else [], From bd50225a7687c611957d4147e819fcfcb6312d07 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Thu, 20 Mar 2025 10:20:42 -0500 Subject: [PATCH 27/38] update to readme documentation --- README.md | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/README.md b/README.md index f9809a4..1d68ed9 100644 --- a/README.md +++ b/README.md @@ -623,6 +623,39 @@ For example, `/ed-fi/apiClients/districts-2425-ds5/{tenant_code}/prod/Stadium` w +## SharefileTransferToSnowflakeDagBuilder +`SharefileTransferToSnowflakeDag` is an Airflow DAG that automates the process of transferring files from ShareFile to Snowflake. The DAG retrieves files from a specified ShareFile location, optionally transforms CSV files into JSONL format, uploads the files to an S3 bucket, and finally loads the data into a Snowflake database. + +
+Arguments: + +----- + +| Argument | Description | +|-------------------------|--------------------------------------------------------------------------| +| dag_id | Unique identifier for the DAG | +| airflow_default_args | Default Airflow arguments for the DAG | +| file_sources | Dictionary of file sources and their ShareFile paths | +| local_base_path | Base path for storing downloaded files locally | +| sharefile_conn_id | Airflow connection ID for ShareFile | +| base_s3_destination_key | S3 key prefix for storing uploaded files | +| s3_conn_id | Airflow connection ID for AWS S3 | +| snowflake_conn_id | Airflow connection ID for Snowflake | +| database | Snowflake database where data will be loaded | +| schema | Snowflake schema where data will be loaded | +| full_refresh | Boolean flag indicating whether to perform a full table refresh | +| schedule_interval | DAG execution schedule (default is None) | +| transform_csv_to_jsonl | Boolean flag to convert CSV files to JSONL before uploading to S3 | +| delete_remote | Boolean flag to delete the original file from ShareFile after transfer | +| delete_local_csv | Boolean flag to delete the local CSV file after transformation | + +Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwargs. + +----- + +
+ +This DAG ensures a seamless pipeline for moving data from ShareFile to Snowflake, supporting transformations and cleanup along the way. # Providers From 6eff93acfe2aab443ae5253807ba0f528d8356f9 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Thu, 20 Mar 2025 10:32:53 -0500 Subject: [PATCH 28/38] updated readme with yaml file showing file_sources --- README.md | 103 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1d68ed9..319dbb8 100644 --- a/README.md +++ b/README.md @@ -655,7 +655,108 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg -This DAG ensures a seamless pipeline for moving data from ShareFile to Snowflake, supporting transformations and cleanup along the way. +```yaml +default_args: + owner: airflow + run_as_user: null + depends_on_past: False + start_date: '2020-05-17' + email: + - mberrien@edanalytics.org + email_on_failure: False + retries: 0 + trigger_rule: all_success + retry_delay: !timedelta 300 # 5 minutes + execution_timeout: !timedelta 21600 # 6 hours + sla: !timedelta 86400 # 24 hours + +connections: + s3: + s3_conn_id: data_lake + s3_bucket : stadium-txexchange-airflow-dev-datalake + s3_region : us-east-2 + base_s3_destination_key: ea_research + snowflake: + snowflake_conn_id: snowflake + snowflake_stage : stadium-txexchange-airflow-dev-datalake + snowflake_db : dev_raw + snowflake_schema : ea_research + sharefile: + sharefile_conn_id: sharefile + sharefile_base_path: /Texas Education Exchange/ea_research/ + +variables: + tmp_dir: /efs/tmp_storage/sharefile/ea_research + local_path: sharefile + schedule_interval: None + delete_remote: False + +file_sources: + prediction_linking_parameters: + sharefile_path: /Texas Education Exchange/ea_research/prediction_linking_parameters/ + dest_table: ea_research.prediction_linking_parameters + truncate: True + colnames: + - state + - display_title + - display_subject + - assessed_grade_level + - interim_scale_score + - summative_scale_score + - summative_scale_score_rounded + - school_year + - assessment_identifier + - file_path + - update_timestamp + + prediction_calibration_parameters: + sharefile_path: /Texas Education Exchange/ea_research/prediction_calibration_parameters/ + dest_table: ea_research.prediction_calibration_parameters + truncate: True + colnames: + - model_abbrev + - variable + - coefficient + - file_path + - update_timestamp + + prediction_models: + sharefile_path: /Texas Education Exchange/ea_research/prediction_models/ + dest_table: ea_research.prediction_models + truncate: True + colnames: + - predicted_abbrev + - model_state + - model_abbrev + - predicted_test_name + - predicted_year + - predicted_subject + - predicted_grade + - predicted_season + - predicted_type + - associated_interim + - pretest_combo + - file_path + - update_timestamp + + prediction_pretest_code_xwalk: + sharefile_path: /Texas Education Exchange/ea_research/prediction_pretest_code_xwalk/ + dest_table: ea_research.prediction_pretest_code_xwalk + truncate: True + colnames: + - predicted_year + - predicted_season + - predicted_grade + - predicted_type + - pretest_code + - pretest_abbrev + - pretest_type + - pretest_grade + - pretest_year + - pretest_season + - file_path + - update_timestamp +``` # Providers From df7cc1446b7590c8c7fa26edb78a3bb8e73ffb1d Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Thu, 20 Mar 2025 10:39:10 -0500 Subject: [PATCH 29/38] added additional information to yaml section of readme --- README.md | 124 +++++++++++++++++------------------------------------- 1 file changed, 39 insertions(+), 85 deletions(-) diff --git a/README.md b/README.md index 319dbb8..6779585 100644 --- a/README.md +++ b/README.md @@ -635,7 +635,7 @@ For example, `/ed-fi/apiClients/districts-2425-ds5/{tenant_code}/prod/Stadium` w |-------------------------|--------------------------------------------------------------------------| | dag_id | Unique identifier for the DAG | | airflow_default_args | Default Airflow arguments for the DAG | -| file_sources | Dictionary of file sources and their ShareFile paths | +| file_sources | Dictionary of file sources and their ShareFile paths (more info in Example Yaml File section) | | local_base_path | Base path for storing downloaded files locally | | sharefile_conn_id | Airflow connection ID for ShareFile | | base_s3_destination_key | S3 key prefix for storing uploaded files | @@ -655,109 +655,63 @@ Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwarg +
+Example Yaml File: + ```yaml default_args: - owner: airflow - run_as_user: null - depends_on_past: False - start_date: '2020-05-17' + owner: + run_as_user: + depends_on_past: + start_date: email: - - mberrien@edanalytics.org email_on_failure: False retries: 0 - trigger_rule: all_success - retry_delay: !timedelta 300 # 5 minutes - execution_timeout: !timedelta 21600 # 6 hours - sla: !timedelta 86400 # 24 hours + trigger_rule: + retry_delay: + execution_timeout: + sla: connections: s3: - s3_conn_id: data_lake - s3_bucket : stadium-txexchange-airflow-dev-datalake - s3_region : us-east-2 - base_s3_destination_key: ea_research + s3_conn_id: + s3_bucket : + s3_region : + base_s3_destination_key: snowflake: - snowflake_conn_id: snowflake - snowflake_stage : stadium-txexchange-airflow-dev-datalake - snowflake_db : dev_raw - snowflake_schema : ea_research + snowflake_conn_id: + snowflake_stage : + snowflake_db : + snowflake_schema : sharefile: - sharefile_conn_id: sharefile - sharefile_base_path: /Texas Education Exchange/ea_research/ + sharefile_conn_id: + sharefile_base_path: variables: - tmp_dir: /efs/tmp_storage/sharefile/ea_research - local_path: sharefile - schedule_interval: None - delete_remote: False + tmp_dir: + local_path: + schedule_interval: + delete_remote: file_sources: - prediction_linking_parameters: - sharefile_path: /Texas Education Exchange/ea_research/prediction_linking_parameters/ - dest_table: ea_research.prediction_linking_parameters - truncate: True - colnames: - - state - - display_title - - display_subject - - assessed_grade_level - - interim_scale_score - - summative_scale_score - - summative_scale_score_rounded - - school_year - - assessment_identifier - - file_path - - update_timestamp - - prediction_calibration_parameters: - sharefile_path: /Texas Education Exchange/ea_research/prediction_calibration_parameters/ - dest_table: ea_research.prediction_calibration_parameters + filename1: + sharefile_path: /Path/to/document/folder/ + dest_table: schema.table truncate: True colnames: - - model_abbrev - - variable - - coefficient - - file_path - - update_timestamp - - prediction_models: - sharefile_path: /Texas Education Exchange/ea_research/prediction_models/ - dest_table: ea_research.prediction_models + - col1 + - col2 + - col3 + filename2: + sharefile_path: /Path/to/document/folder/ + dest_table: schema.table truncate: True colnames: - - predicted_abbrev - - model_state - - model_abbrev - - predicted_test_name - - predicted_year - - predicted_subject - - predicted_grade - - predicted_season - - predicted_type - - associated_interim - - pretest_combo - - file_path - - update_timestamp - - prediction_pretest_code_xwalk: - sharefile_path: /Texas Education Exchange/ea_research/prediction_pretest_code_xwalk/ - dest_table: ea_research.prediction_pretest_code_xwalk - truncate: True - colnames: - - predicted_year - - predicted_season - - predicted_grade - - predicted_type - - pretest_code - - pretest_abbrev - - pretest_type - - pretest_grade - - pretest_year - - pretest_season - - file_path - - update_timestamp + - col1 + - col2 + - col3 ``` - +
# Providers Finally, this package contains a handful of custom DBT operators to be used as an alternative to PythonOperators. From 32b37ffa9291a2275a1d70b3daea7cf72e32a4c2 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Fri, 21 Mar 2025 11:40:38 -0500 Subject: [PATCH 30/38] updated s3 to snowflake task --- .../sharefile_to_snowflake_dag_builder.py | 74 +++++++++++++------ ea_airflow_util/providers/aws/operators/s3.py | 2 +- 2 files changed, 53 insertions(+), 23 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 7786c6c..178625c 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -6,12 +6,14 @@ from airflow.models import Param from airflow.operators.python import PythonOperator +from edu_edfi_airflow.callables import s3 + from ea_airflow_util.callables.airflow import skip_if_not_in_params_list from ea_airflow_util.callables import jsonl, snowflake from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator -from edu_edfi_airflow.callables import s3 from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator from ea_airflow_util.callables.airflow import xcom_pull_template +from ea_airflow_util.providers.sharefile.hooks.sharefile import SharefileHook from airflow.providers.amazon.aws.hooks.s3 import S3Hook from ea_airflow_util import EACustomDAG @@ -47,7 +49,9 @@ def __init__(self, schedule_interval = None, transform_csv_to_jsonl: bool = False, delete_remote: bool = False, - delete_local_csv: bool = False + delete_local_csv: bool = False, + transfer_s3_to_snowflake = None, + **kwargs ): self.dag_id = dag_id self.airflow_default_args = airflow_default_args @@ -69,6 +73,7 @@ def __init__(self, self.database = database self.schema = schema self.full_refresh = full_refresh + self.transfer_s3_to_snowflake = transfer_s3_to_snowflake self.logger = logging.getLogger(self.__class__.__name__) @@ -99,16 +104,35 @@ def build_sharefile_to_snowflake_dag(self, **kwargs): 'param_name': "file_sources", 'value': file }, - dag=self.dag + dag=self.dag, + **kwargs ) - transfer_sharefile_to_disk = SharefileToDiskOperator( - task_id=f"transfer_{file}_to_disk", - sharefile_conn_id=self.sharefile_conn_id, - sharefile_path=details['sharefile_path'], - local_path=os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file), - delete_remote=self.delete_remote, - dag=self.dag + sharefile_path = details['sharefile_path'] + sharefile_hook = SharefileHook(sharefile_conn_id=self.sharefile_conn_id) + folder_id = sharefile_hook.folder_id_from_path(sharefile_path) + + if folder_id: + transfer_sharefile_to_disk = SharefileToDiskOperator( + task_id=f"transfer_{file}_to_disk", + sharefile_conn_id=self.sharefile_conn_id, + sharefile_path=details['sharefile_path'], + local_path=os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file), + delete_remote=self.delete_remote, + dag=self.dag, + **kwargs + ) + else: + def download_single_file(**context): + file_id = sharefile_hook.get_path_id(sharefile_path) + sharefile_hook.download_to_disk(file_id, os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file)) + + transfer_sharefile_to_disk = PythonOperator( + task_id=f"download_{file}_to_disk", + python_callable=download_single_file, + provide_context=True, + dag=self.dag, + **kwargs ) transform_to_jsonl = PythonOperator( @@ -120,7 +144,8 @@ def build_sharefile_to_snowflake_dag(self, **kwargs): 'delete_csv': self.delete_local_csv, 'metadata_dict': {'file_source': file}, }, - dag=self.dag + dag=self.dag, + **kwargs ) transfer_disk_to_s3 = PythonOperator( @@ -131,19 +156,24 @@ def build_sharefile_to_snowflake_dag(self, **kwargs): 's3_destination_key': f"{self.base_s3_destination_key}/" + '{{ds_nodash}}' + "/" + '{{ts_nodash}}' + f"/{file}", 's3_conn_id': self.s3_conn_id }, - dag=self.dag + dag=self.dag, + **kwargs ) - transfer_s3_to_snowflake = S3ToSnowflakeOperator( - task_id=f"{file}_s3_to_snowflake", - snowflake_conn_id=self.snowflake_conn_id, - database=self.database, - schema=self.schema, - table_name=file, - s3_destination_key=self.base_s3_destination_key, - full_refresh=self.full_refresh, - dag=self.dag - ) + if not transfer_s3_to_snowflake: + transfer_s3_to_snowflake = S3ToSnowflakeOperator( + task_id=f"{file}_s3_to_snowflake", + snowflake_conn_id=self.snowflake_conn_id, + database=self.database, + schema=self.schema, + table_name=file, + s3_destination_key=self.base_s3_destination_key, + full_refresh=self.full_refresh, + dag=self.dag, + **kwargs + ) + else: + transfer_s3_to_snowflake = transfer_s3_to_snowflake if self.transform_csv_to_jsonl: check_if_file_in_param >> transfer_sharefile_to_disk >> transform_to_jsonl >> transfer_disk_to_s3 >> transfer_s3_to_snowflake diff --git a/ea_airflow_util/providers/aws/operators/s3.py b/ea_airflow_util/providers/aws/operators/s3.py index d680f20..683ad00 100644 --- a/ea_airflow_util/providers/aws/operators/s3.py +++ b/ea_airflow_util/providers/aws/operators/s3.py @@ -186,7 +186,7 @@ def execute(self, context): TO_TIMESTAMP(REGEXP_SUBSTR(metadata$filename, '{ts_regex}'), 'YYYYMMDDTHH24MISS') AS pull_timestamp, metadata$file_row_number AS file_row_number, metadata$filename AS filename, - {metadata_columns if metadata_columns else ''} + {metadata_columns} t.$1 AS v FROM @{self.database}.util.airflow_stage/{self.s3_destination_key} (file_format => 'json_default') t From e791ca207fdc962f5c3e3cf2d5d723648c5c4e32 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Fri, 21 Mar 2025 15:25:07 -0500 Subject: [PATCH 31/38] updated sharefile to snowflake dag builder --- .../sharefile_to_snowflake_dag_builder.py | 46 ++++++++++++++----- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 178625c..26567fe 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -28,11 +28,24 @@ class SharefileTransferToSnowflakeDagBuilder: This class is responsible for creating an Apache Airflow DAG that automates the process of transferring files from ShareFile to Snowflake. - Attributes: - dag_id (str): The ID for the Airflow DAG. - airflow_default_args (dict): Default arguments passed to the DAG. - file_sources (list): List of file sources to be processed. - schedule_interval (str or None): The schedule interval for the DAG. + Parameters: + - dag_id (str): ID of the DAG to be created. + - airflow_default_args (dict): Default arguments to pass to the DAG. + - file_sources (dict): A mapping of file source names to their ShareFile paths. + - local_base_path (str): Base local path for downloading files. + - sharefile_conn_id (str): Airflow connection ID for ShareFile. + - base_s3_destination_key (str): Base S3 key (path prefix) for uploads. + - s3_conn_id (str): Airflow connection ID for AWS S3. + - snowflake_conn_id (str): Airflow connection ID for Snowflake. + - database (str): Snowflake database name. + - schema (str): Snowflake schema name. + - full_refresh (bool): If True, performs a full refresh load in Snowflake. + - schedule_interval (str or timedelta, optional): Airflow schedule interval for the DAG. + - transform_csv_to_jsonl (bool, optional): If True, converts downloaded CSV files to JSONL format. + - delete_remote (bool, optional): If True, deletes the original file from ShareFile after download. + - delete_local_csv (bool, optional): If True, deletes the CSV after transforming to JSONL. + - transfer_s3_to_snowflake (BaseOperator, optional): Custom operator for the S3 to Snowflake transfer step. + - **kwargs: Additional arguments passed to Airflow operators. """ def __init__(self, dag_id: str, @@ -94,7 +107,17 @@ def __init__(self, ) def build_sharefile_to_snowflake_dag(self, **kwargs): - + """ + Builds the DAG with tasks for each file source, including: + - Conditional execution based on DAG parameters. + - Downloading from ShareFile to local disk (supports both folder and single file paths). + - Optional transformation from CSV to JSONL. + - Upload to S3. + - Load into Snowflake using the default or a custom operator. + + Returns: + airflow.DAG: The constructed Airflow DAG instance. + """ for file, details in self.file_sources.items(): check_if_file_in_param = PythonOperator( @@ -109,8 +132,9 @@ def build_sharefile_to_snowflake_dag(self, **kwargs): ) sharefile_path = details['sharefile_path'] - sharefile_hook = SharefileHook(sharefile_conn_id=self.sharefile_conn_id) - folder_id = sharefile_hook.folder_id_from_path(sharefile_path) + sf_hook = SharefileHook(sharefile_conn_id=self.sharefile_conn_id) + sf_hook.get_conn() + folder_id = sf_hook.folder_id_from_path(sharefile_path) if folder_id: transfer_sharefile_to_disk = SharefileToDiskOperator( @@ -124,8 +148,8 @@ def build_sharefile_to_snowflake_dag(self, **kwargs): ) else: def download_single_file(**context): - file_id = sharefile_hook.get_path_id(sharefile_path) - sharefile_hook.download_to_disk(file_id, os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file)) + file_id = sf_hook.get_path_id(sharefile_path) + sf_hook.download_to_disk(file_id, os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file)) transfer_sharefile_to_disk = PythonOperator( task_id=f"download_{file}_to_disk", @@ -160,7 +184,7 @@ def download_single_file(**context): **kwargs ) - if not transfer_s3_to_snowflake: + if not self.transfer_s3_to_snowflake: transfer_s3_to_snowflake = S3ToSnowflakeOperator( task_id=f"{file}_s3_to_snowflake", snowflake_conn_id=self.snowflake_conn_id, From 0f4ef7c5314280409c5636bab04361b06f669605 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Mon, 24 Mar 2025 10:18:19 -0500 Subject: [PATCH 32/38] added print statement for folder ID to see what is happening --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 26567fe..076d1f8 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -135,6 +135,8 @@ def build_sharefile_to_snowflake_dag(self, **kwargs): sf_hook = SharefileHook(sharefile_conn_id=self.sharefile_conn_id) sf_hook.get_conn() folder_id = sf_hook.folder_id_from_path(sharefile_path) + + print(f"Folder ID: {folder_id}") if folder_id: transfer_sharefile_to_disk = SharefileToDiskOperator( From 6f57bf666482295ffb03fae808a7a547141aa7a7 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Mon, 24 Mar 2025 12:37:41 -0500 Subject: [PATCH 33/38] updated transfer_to_s3 --- .../sharefile_to_snowflake_dag_builder.py | 40 ++++++------------- 1 file changed, 12 insertions(+), 28 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 076d1f8..dda78cb 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -105,6 +105,10 @@ def __init__(self, params=self.params_dict, catchup=False ) + + def download_from_sharefile(**context): + + def build_sharefile_to_snowflake_dag(self, **kwargs): """ @@ -130,35 +134,15 @@ def build_sharefile_to_snowflake_dag(self, **kwargs): dag=self.dag, **kwargs ) - - sharefile_path = details['sharefile_path'] - sf_hook = SharefileHook(sharefile_conn_id=self.sharefile_conn_id) - sf_hook.get_conn() - folder_id = sf_hook.folder_id_from_path(sharefile_path) - print(f"Folder ID: {folder_id}") - - if folder_id: - transfer_sharefile_to_disk = SharefileToDiskOperator( - task_id=f"transfer_{file}_to_disk", - sharefile_conn_id=self.sharefile_conn_id, - sharefile_path=details['sharefile_path'], - local_path=os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file), - delete_remote=self.delete_remote, - dag=self.dag, - **kwargs - ) - else: - def download_single_file(**context): - file_id = sf_hook.get_path_id(sharefile_path) - sf_hook.download_to_disk(file_id, os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file)) - - transfer_sharefile_to_disk = PythonOperator( - task_id=f"download_{file}_to_disk", - python_callable=download_single_file, - provide_context=True, - dag=self.dag, - **kwargs + transfer_sharefile_to_disk = SharefileToDiskOperator( + task_id=f"transfer_{file}_to_disk", + sharefile_conn_id=self.sharefile_conn_id, + sharefile_path=details['sharefile_path'], + local_path=os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file), + delete_remote=self.delete_remote, + dag=self.dag, + **kwargs ) transform_to_jsonl = PythonOperator( From 344ada0ca1ef3c2b880ff12d88810acf32bb096b Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Mon, 24 Mar 2025 12:39:46 -0500 Subject: [PATCH 34/38] removed partial method --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index dda78cb..983b0fc 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -106,10 +106,6 @@ def __init__(self, catchup=False ) - def download_from_sharefile(**context): - - - def build_sharefile_to_snowflake_dag(self, **kwargs): """ Builds the DAG with tasks for each file source, including: From 950ca35378adcd473c61ec0e5614824b62b8a65d Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Tue, 25 Mar 2025 13:32:47 -0500 Subject: [PATCH 35/38] added controle flow for choosing tansfer from s3 to snowflake step --- .../sharefile_to_snowflake_dag_builder.py | 28 +++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 983b0fc..10938fe 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -1,6 +1,7 @@ -from datetime import datetime import logging import os +from datetime import datetime +from typing import Callable, List, Optional, Union from airflow import DAG from airflow.models import Param @@ -105,6 +106,29 @@ def __init__(self, params=self.params_dict, catchup=False ) + + def build_python_operator(self, + python_callable: Callable, + **kwargs + ) -> PythonOperator: + """ + Optional Python preprocessing operator to run before Earthmover and Lightbeam. + + :param python_callable: + :param kwargs: + :return: + """ + callable_name = python_callable.__name__.strip('<>') # Remove brackets around lambdas + task_id = f"{self.run_type}__preprocess_python_callable__{callable_name}" + + return PythonOperator( + task_id=task_id, + python_callable=python_callable, + op_kwargs=kwargs, + provide_context=True, + pool=self.pool, + dag=self.dag + ) def build_sharefile_to_snowflake_dag(self, **kwargs): """ @@ -179,7 +203,7 @@ def build_sharefile_to_snowflake_dag(self, **kwargs): **kwargs ) else: - transfer_s3_to_snowflake = transfer_s3_to_snowflake + transfer_s3_to_snowflake = self.build_python_operator(self.transfer_s3_to_snowflake) if self.transform_csv_to_jsonl: check_if_file_in_param >> transfer_sharefile_to_disk >> transform_to_jsonl >> transfer_disk_to_s3 >> transfer_s3_to_snowflake From 3dab30f42f7cb2dd517b1834e68399ea7ed7f612 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Thu, 27 Mar 2025 09:23:37 -0500 Subject: [PATCH 36/38] updated sharefile to working state --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 10938fe..313ecb3 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -10,7 +10,7 @@ from edu_edfi_airflow.callables import s3 from ea_airflow_util.callables.airflow import skip_if_not_in_params_list -from ea_airflow_util.callables import jsonl, snowflake +from ea_airflow_util.callables import jsonl, snowhttps://github.com/edanalytics/ea_airflow_util/actionsflake from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator from ea_airflow_util.callables.airflow import xcom_pull_template @@ -106,7 +106,7 @@ def __init__(self, params=self.params_dict, catchup=False ) - + def build_python_operator(self, python_callable: Callable, **kwargs From f76a18a43a9bb3622f9bbbd9366f86d7a5a09e47 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Thu, 27 Mar 2025 12:31:07 -0500 Subject: [PATCH 37/38] updated file to handle both single file and folder. testing in TX dev server --- .../sharefile/transfers/sharefile_to_disk.py | 185 +++++++++++------- 1 file changed, 110 insertions(+), 75 deletions(-) diff --git a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py index 70184b9..6b84afd 100644 --- a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py +++ b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py @@ -50,92 +50,127 @@ def execute(self, **context): sf_hook = SharefileHook(sharefile_conn_id=self.sharefile_conn_id) sf_hook.get_conn() - # get the item id of the remote path, find all files within that path (up to 1000) - base_path_id = sf_hook.get_path_id(self.sharefile_path) - remote_files = sf_hook.find_files(base_path_id) - - # check whether we found anything - if len(remote_files) == 0: - self.log.info("No files on FTP") - raise AirflowSkipException - - # extract relevant file details - files = [] - for res in remote_files: - file_details = { - 'file_name': res['FileName'], - 'size': res['Size'], - 'parent_id': res['ParentID'], - 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''), - 'file_path_ftp': res['ParentSemanticPath'], - 'item_id': res['ItemID'] - } - - - files.append(file_details) - - if self.most_recent_file: - # of files found in directory, find the one with the most recent edit timestamp - max_timestamp = None - chosen_file = [] - for res in files: - # seem to be cases where search is out of date and returns items that don't exist - try: - item_info = sf_hook.item_info(res['item_id']) - except: - # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent - continue - # grab last modified, compare to current max known - item_last_modified = item_info['ProgenyEditDate'] - if max_timestamp is None or item_last_modified > max_timestamp: - max_timestamp = item_last_modified - chosen_file = [res] - # overwrite files list with singular chosen item - files = chosen_file - - - # for all files, move to local - num_successes = 0 - - for file in files: + # get the item ID of the privided path (either file or folder) + item_id = sf_hook.get_path_id(self.sharefile_path) + if not item_id: + raise AirflowException(f"Cound not find item ID for path: {self.sharefile_path}") + + try: + item_info = sf_hook.item_info(item_id) + except Exception as e: + raise AirflowException(f"Failed to retrieve metadata for item {item_id}: {e}") + + # Case 1: single file + if item_info["odata.type"] == "ShareFile.Api.Models.File": + file_name = item_info['FileName'] + full_local_path = os.path.join(self.local_path, file_name) - remote_file = os.path.join(file['file_path_ftp'], file['file_name']) - self.log.info("Attempting to get file " + remote_file) - - # lower filename and replace spaces with underscores - file['file_name'] = file['file_name'].lower().replace(' ', '_') - - # check to see if there is other metadata needed in local path and if not, add filename to local path - if file['parent_id'] == base_path_id: - full_local_path = os.path.join(self.local_path, file['file_name']) - else: - full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name']) - - # create dir (works if there is a file name or not) os.makedirs(os.path.dirname(full_local_path), exist_ok=True) - # download the file try: - sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path) - + sf_hook.download_to_disk(item_id=item_id, local_path=full_local_path) if self.delete_remote: - sf_hook.delete(file['item_id']) - - num_successes += 1 - + sf_hook.delete(item_id) + + return self.local_path + except Exception as err: - self.log.error(f'Failed to get file with message: {err}') - + self.log.error(f"Failed to download file {self.sharefile_path}: {err}") if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"): slack.slack_alert_download_failure( context=context, http_conn_id=slack_conn_id, - remote_path=remote_file, local_path=full_local_path, error=err + remote_path=self.sharefile_path, local_path=full_local_path, error=err ) + raise AirflowException("Single file transfer failed") + + else: + + # get the item id of the remote path, find all files within that path (up to 1000) + base_path_id = item_id + remote_files = sf_hook.find_files(base_path_id) + + # check whether we found anything + if len(remote_files) == 0: + self.log.info("No files on FTP") + raise AirflowSkipException + + # extract relevant file details + files = [] + for res in remote_files: + file_details = { + 'file_name': res['FileName'], + 'size': res['Size'], + 'parent_id': res['ParentID'], + 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''), + 'file_path_ftp': res['ParentSemanticPath'], + 'item_id': res['ItemID'] + } + + + files.append(file_details) + + if self.most_recent_file: + # of files found in directory, find the one with the most recent edit timestamp + max_timestamp = None + chosen_file = [] + for res in files: + # seem to be cases where search is out of date and returns items that don't exist + try: + item_info = sf_hook.item_info(res['item_id']) + except: + # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent + continue + # grab last modified, compare to current max known + item_last_modified = item_info['ProgenyEditDate'] + if max_timestamp is None or item_last_modified > max_timestamp: + max_timestamp = item_last_modified + chosen_file = [res] + # overwrite files list with singular chosen item + files = chosen_file + + + # for all files, move to local + num_successes = 0 + + for file in files: + + remote_file = os.path.join(file['file_path_ftp'], file['file_name']) + self.log.info("Attempting to get file " + remote_file) + + # lower filename and replace spaces with underscores + file['file_name'] = file['file_name'].lower().replace(' ', '_') + + # check to see if there is other metadata needed in local path and if not, add filename to local path + if file['parent_id'] == base_path_id: + full_local_path = os.path.join(self.local_path, file['file_name']) + else: + full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name']) + + # create dir (works if there is a file name or not) + os.makedirs(os.path.dirname(full_local_path), exist_ok=True) + + # download the file + try: + sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path) + + if self.delete_remote: + sf_hook.delete(file['item_id']) - continue + num_successes += 1 + + except Exception as err: + self.log.error(f'Failed to get file with message: {err}') + + if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"): + slack.slack_alert_download_failure( + context=context, http_conn_id=slack_conn_id, + remote_path=remote_file, local_path=full_local_path, error=err + ) + + continue - if num_successes == 0: - raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!") + if num_successes == 0: + raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!") - return self.local_path + return self.local_path From 5baa75bb18548f5af278cfacb12816ec10888aa4 Mon Sep 17 00:00:00 2001 From: mberrien-fitzsimons Date: Thu, 27 Mar 2025 16:54:58 -0500 Subject: [PATCH 38/38] fixed mistake in imports --- ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py index 313ecb3..1c11b37 100644 --- a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py @@ -10,7 +10,7 @@ from edu_edfi_airflow.callables import s3 from ea_airflow_util.callables.airflow import skip_if_not_in_params_list -from ea_airflow_util.callables import jsonl, snowhttps://github.com/edanalytics/ea_airflow_util/actionsflake +from ea_airflow_util.callables import jsonl, snowflake from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator from ea_airflow_util.callables.airflow import xcom_pull_template