diff --git a/README.md b/README.md
index f9809a4..6779585 100644
--- a/README.md
+++ b/README.md
@@ -623,7 +623,95 @@ For example, `/ed-fi/apiClients/districts-2425-ds5/{tenant_code}/prod/Stadium` w
+## SharefileTransferToSnowflakeDagBuilder
+`SharefileTransferToSnowflakeDag` is an Airflow DAG that automates the process of transferring files from ShareFile to Snowflake. The DAG retrieves files from a specified ShareFile location, optionally transforms CSV files into JSONL format, uploads the files to an S3 bucket, and finally loads the data into a Snowflake database.
+
+Arguments:
+
+-----
+
+| Argument | Description |
+|-------------------------|--------------------------------------------------------------------------|
+| dag_id | Unique identifier for the DAG |
+| airflow_default_args | Default Airflow arguments for the DAG |
+| file_sources | Dictionary of file sources and their ShareFile paths (more info in Example Yaml File section) |
+| local_base_path | Base path for storing downloaded files locally |
+| sharefile_conn_id | Airflow connection ID for ShareFile |
+| base_s3_destination_key | S3 key prefix for storing uploaded files |
+| s3_conn_id | Airflow connection ID for AWS S3 |
+| snowflake_conn_id | Airflow connection ID for Snowflake |
+| database | Snowflake database where data will be loaded |
+| schema | Snowflake schema where data will be loaded |
+| full_refresh | Boolean flag indicating whether to perform a full table refresh |
+| schedule_interval | DAG execution schedule (default is None) |
+| transform_csv_to_jsonl | Boolean flag to convert CSV files to JSONL before uploading to S3 |
+| delete_remote | Boolean flag to delete the original file from ShareFile after transfer |
+| delete_local_csv | Boolean flag to delete the local CSV file after transformation |
+
+Additional `EACustomDAG` arguments (e.g. `slack_conn_id`) can be passed as kwargs.
+
+-----
+
+
+
+
+Example Yaml File:
+
+```yaml
+default_args:
+ owner:
+ run_as_user:
+ depends_on_past:
+ start_date:
+ email:
+ email_on_failure: False
+ retries: 0
+ trigger_rule:
+ retry_delay:
+ execution_timeout:
+ sla:
+
+connections:
+ s3:
+ s3_conn_id:
+ s3_bucket :
+ s3_region :
+ base_s3_destination_key:
+ snowflake:
+ snowflake_conn_id:
+ snowflake_stage :
+ snowflake_db :
+ snowflake_schema :
+ sharefile:
+ sharefile_conn_id:
+ sharefile_base_path:
+
+variables:
+ tmp_dir:
+ local_path:
+ schedule_interval:
+ delete_remote:
+
+file_sources:
+ filename1:
+ sharefile_path: /Path/to/document/folder/
+ dest_table: schema.table
+ truncate: True
+ colnames:
+ - col1
+ - col2
+ - col3
+ filename2:
+ sharefile_path: /Path/to/document/folder/
+ dest_table: schema.table
+ truncate: True
+ colnames:
+ - col1
+ - col2
+ - col3
+```
+
# Providers
Finally, this package contains a handful of custom DBT operators to be used as an alternative to PythonOperators.
diff --git a/ea_airflow_util/dags/s3_to_snowflake_dag.py b/ea_airflow_util/dags/s3_to_snowflake_dag.py
index a9e8c71..9513c58 100644
--- a/ea_airflow_util/dags/s3_to_snowflake_dag.py
+++ b/ea_airflow_util/dags/s3_to_snowflake_dag.py
@@ -36,7 +36,7 @@ def __init__(self,
is_manual_upload: bool = False,
pool: str,
- full_replace: bool = False, #TODO once on latest version of airflow, use dagrun parameter to allow full_replace runs even if not set here at dag level
+ full_replace: bool = False,
do_delete_from_source: bool = True,
**kwargs
diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py
new file mode 100644
index 0000000..1c11b37
--- /dev/null
+++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag_builder.py
@@ -0,0 +1,213 @@
+import logging
+import os
+from datetime import datetime
+from typing import Callable, List, Optional, Union
+
+from airflow import DAG
+from airflow.models import Param
+from airflow.operators.python import PythonOperator
+
+from edu_edfi_airflow.callables import s3
+
+from ea_airflow_util.callables.airflow import skip_if_not_in_params_list
+from ea_airflow_util.callables import jsonl, snowflake
+from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator
+from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator
+from ea_airflow_util.callables.airflow import xcom_pull_template
+from ea_airflow_util.providers.sharefile.hooks.sharefile import SharefileHook
+
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from ea_airflow_util import EACustomDAG
+
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+)
+
+class SharefileTransferToSnowflakeDagBuilder:
+ """
+ This class is responsible for creating an Apache Airflow DAG that automates the process of transferring
+ files from ShareFile to Snowflake.
+
+ Parameters:
+ - dag_id (str): ID of the DAG to be created.
+ - airflow_default_args (dict): Default arguments to pass to the DAG.
+ - file_sources (dict): A mapping of file source names to their ShareFile paths.
+ - local_base_path (str): Base local path for downloading files.
+ - sharefile_conn_id (str): Airflow connection ID for ShareFile.
+ - base_s3_destination_key (str): Base S3 key (path prefix) for uploads.
+ - s3_conn_id (str): Airflow connection ID for AWS S3.
+ - snowflake_conn_id (str): Airflow connection ID for Snowflake.
+ - database (str): Snowflake database name.
+ - schema (str): Snowflake schema name.
+ - full_refresh (bool): If True, performs a full refresh load in Snowflake.
+ - schedule_interval (str or timedelta, optional): Airflow schedule interval for the DAG.
+ - transform_csv_to_jsonl (bool, optional): If True, converts downloaded CSV files to JSONL format.
+ - delete_remote (bool, optional): If True, deletes the original file from ShareFile after download.
+ - delete_local_csv (bool, optional): If True, deletes the CSV after transforming to JSONL.
+ - transfer_s3_to_snowflake (BaseOperator, optional): Custom operator for the S3 to Snowflake transfer step.
+ - **kwargs: Additional arguments passed to Airflow operators.
+ """
+ def __init__(self,
+ dag_id: str,
+ airflow_default_args: dict,
+ file_sources: dict,
+ local_base_path: str,
+ sharefile_conn_id: str,
+ base_s3_destination_key: str,
+ s3_conn_id: str,
+ snowflake_conn_id: str,
+ database: str,
+ schema: str,
+ full_refresh: bool,
+ schedule_interval = None,
+ transform_csv_to_jsonl: bool = False,
+ delete_remote: bool = False,
+ delete_local_csv: bool = False,
+ transfer_s3_to_snowflake = None,
+ **kwargs
+ ):
+ self.dag_id = dag_id
+ self.airflow_default_args = airflow_default_args
+ self.file_sources = file_sources
+ self.schedule_interval = schedule_interval
+
+ self.local_base_path = local_base_path
+ self.transform_csv_to_jsonl = transform_csv_to_jsonl
+
+ self.sharefile_conn_id = sharefile_conn_id
+ self.delete_remote = delete_remote
+
+ self.delete_local_csv = delete_local_csv
+
+ self.base_s3_destination_key = base_s3_destination_key
+ self.s3_conn_id = s3_conn_id
+
+ self.snowflake_conn_id = snowflake_conn_id
+ self.database = database
+ self.schema = schema
+ self.full_refresh = full_refresh
+ self.transfer_s3_to_snowflake = transfer_s3_to_snowflake
+
+ self.logger = logging.getLogger(self.__class__.__name__)
+
+ self.params_dict = {
+ "file_sources": Param(
+ default=list(self.file_sources.keys()) if self.file_sources else [],
+ examples=list(self.file_sources.keys()),
+ type="array",
+ description="Newline-separated list of file sources to pull from ShareFile",
+ ),
+ }
+
+ self.dag = EACustomDAG(dag_id=self.dag_id,
+ default_args=self.airflow_default_args,
+ schedule_interval=self.schedule_interval,
+ params=self.params_dict,
+ catchup=False
+ )
+
+ def build_python_operator(self,
+ python_callable: Callable,
+ **kwargs
+ ) -> PythonOperator:
+ """
+ Optional Python preprocessing operator to run before Earthmover and Lightbeam.
+
+ :param python_callable:
+ :param kwargs:
+ :return:
+ """
+ callable_name = python_callable.__name__.strip('<>') # Remove brackets around lambdas
+ task_id = f"{self.run_type}__preprocess_python_callable__{callable_name}"
+
+ return PythonOperator(
+ task_id=task_id,
+ python_callable=python_callable,
+ op_kwargs=kwargs,
+ provide_context=True,
+ pool=self.pool,
+ dag=self.dag
+ )
+
+ def build_sharefile_to_snowflake_dag(self, **kwargs):
+ """
+ Builds the DAG with tasks for each file source, including:
+ - Conditional execution based on DAG parameters.
+ - Downloading from ShareFile to local disk (supports both folder and single file paths).
+ - Optional transformation from CSV to JSONL.
+ - Upload to S3.
+ - Load into Snowflake using the default or a custom operator.
+
+ Returns:
+ airflow.DAG: The constructed Airflow DAG instance.
+ """
+ for file, details in self.file_sources.items():
+
+ check_if_file_in_param = PythonOperator(
+ task_id=f"check_{file}",
+ python_callable=skip_if_not_in_params_list,
+ op_kwargs={
+ 'param_name': "file_sources",
+ 'value': file
+ },
+ dag=self.dag,
+ **kwargs
+ )
+
+ transfer_sharefile_to_disk = SharefileToDiskOperator(
+ task_id=f"transfer_{file}_to_disk",
+ sharefile_conn_id=self.sharefile_conn_id,
+ sharefile_path=details['sharefile_path'],
+ local_path=os.path.join(self.local_base_path, '{{ds_nodash}}', '{{ts_nodash}}', file),
+ delete_remote=self.delete_remote,
+ dag=self.dag,
+ **kwargs
+ )
+
+ transform_to_jsonl = PythonOperator(
+ task_id=f"transform_{file}_to_jsonl",
+ python_callable=jsonl.translate_csv_file_to_jsonl,
+ op_kwargs={
+ 'local_path': xcom_pull_template(transfer_sharefile_to_disk),
+ 'output_path': None,
+ 'delete_csv': self.delete_local_csv,
+ 'metadata_dict': {'file_source': file},
+ },
+ dag=self.dag,
+ **kwargs
+ )
+
+ transfer_disk_to_s3 = PythonOperator(
+ task_id=f"transfer_{file}_to_s3",
+ python_callable=s3.local_filepath_to_s3,
+ op_kwargs={
+ 'local_filepath': xcom_pull_template(transfer_sharefile_to_disk),
+ 's3_destination_key': f"{self.base_s3_destination_key}/" + '{{ds_nodash}}' + "/" + '{{ts_nodash}}' + f"/{file}",
+ 's3_conn_id': self.s3_conn_id
+ },
+ dag=self.dag,
+ **kwargs
+ )
+
+ if not self.transfer_s3_to_snowflake:
+ transfer_s3_to_snowflake = S3ToSnowflakeOperator(
+ task_id=f"{file}_s3_to_snowflake",
+ snowflake_conn_id=self.snowflake_conn_id,
+ database=self.database,
+ schema=self.schema,
+ table_name=file,
+ s3_destination_key=self.base_s3_destination_key,
+ full_refresh=self.full_refresh,
+ dag=self.dag,
+ **kwargs
+ )
+ else:
+ transfer_s3_to_snowflake = self.build_python_operator(self.transfer_s3_to_snowflake)
+
+ if self.transform_csv_to_jsonl:
+ check_if_file_in_param >> transfer_sharefile_to_disk >> transform_to_jsonl >> transfer_disk_to_s3 >> transfer_s3_to_snowflake
+ else:
+ check_if_file_in_param >> transfer_sharefile_to_disk >> transfer_disk_to_s3 >> transfer_s3_to_snowflake
+
+ return self.dag
\ No newline at end of file
diff --git a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py
index 70184b9..6b84afd 100644
--- a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py
+++ b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py
@@ -50,92 +50,127 @@ def execute(self, **context):
sf_hook = SharefileHook(sharefile_conn_id=self.sharefile_conn_id)
sf_hook.get_conn()
- # get the item id of the remote path, find all files within that path (up to 1000)
- base_path_id = sf_hook.get_path_id(self.sharefile_path)
- remote_files = sf_hook.find_files(base_path_id)
-
- # check whether we found anything
- if len(remote_files) == 0:
- self.log.info("No files on FTP")
- raise AirflowSkipException
-
- # extract relevant file details
- files = []
- for res in remote_files:
- file_details = {
- 'file_name': res['FileName'],
- 'size': res['Size'],
- 'parent_id': res['ParentID'],
- 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''),
- 'file_path_ftp': res['ParentSemanticPath'],
- 'item_id': res['ItemID']
- }
-
-
- files.append(file_details)
-
- if self.most_recent_file:
- # of files found in directory, find the one with the most recent edit timestamp
- max_timestamp = None
- chosen_file = []
- for res in files:
- # seem to be cases where search is out of date and returns items that don't exist
- try:
- item_info = sf_hook.item_info(res['item_id'])
- except:
- # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent
- continue
- # grab last modified, compare to current max known
- item_last_modified = item_info['ProgenyEditDate']
- if max_timestamp is None or item_last_modified > max_timestamp:
- max_timestamp = item_last_modified
- chosen_file = [res]
- # overwrite files list with singular chosen item
- files = chosen_file
-
-
- # for all files, move to local
- num_successes = 0
-
- for file in files:
+ # get the item ID of the privided path (either file or folder)
+ item_id = sf_hook.get_path_id(self.sharefile_path)
+ if not item_id:
+ raise AirflowException(f"Cound not find item ID for path: {self.sharefile_path}")
+
+ try:
+ item_info = sf_hook.item_info(item_id)
+ except Exception as e:
+ raise AirflowException(f"Failed to retrieve metadata for item {item_id}: {e}")
+
+ # Case 1: single file
+ if item_info["odata.type"] == "ShareFile.Api.Models.File":
+ file_name = item_info['FileName']
+ full_local_path = os.path.join(self.local_path, file_name)
- remote_file = os.path.join(file['file_path_ftp'], file['file_name'])
- self.log.info("Attempting to get file " + remote_file)
-
- # lower filename and replace spaces with underscores
- file['file_name'] = file['file_name'].lower().replace(' ', '_')
-
- # check to see if there is other metadata needed in local path and if not, add filename to local path
- if file['parent_id'] == base_path_id:
- full_local_path = os.path.join(self.local_path, file['file_name'])
- else:
- full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name'])
-
- # create dir (works if there is a file name or not)
os.makedirs(os.path.dirname(full_local_path), exist_ok=True)
- # download the file
try:
- sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path)
-
+ sf_hook.download_to_disk(item_id=item_id, local_path=full_local_path)
if self.delete_remote:
- sf_hook.delete(file['item_id'])
-
- num_successes += 1
-
+ sf_hook.delete(item_id)
+
+ return self.local_path
+
except Exception as err:
- self.log.error(f'Failed to get file with message: {err}')
-
+ self.log.error(f"Failed to download file {self.sharefile_path}: {err}")
if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"):
slack.slack_alert_download_failure(
context=context, http_conn_id=slack_conn_id,
- remote_path=remote_file, local_path=full_local_path, error=err
+ remote_path=self.sharefile_path, local_path=full_local_path, error=err
)
+ raise AirflowException("Single file transfer failed")
+
+ else:
+
+ # get the item id of the remote path, find all files within that path (up to 1000)
+ base_path_id = item_id
+ remote_files = sf_hook.find_files(base_path_id)
+
+ # check whether we found anything
+ if len(remote_files) == 0:
+ self.log.info("No files on FTP")
+ raise AirflowSkipException
+
+ # extract relevant file details
+ files = []
+ for res in remote_files:
+ file_details = {
+ 'file_name': res['FileName'],
+ 'size': res['Size'],
+ 'parent_id': res['ParentID'],
+ 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''),
+ 'file_path_ftp': res['ParentSemanticPath'],
+ 'item_id': res['ItemID']
+ }
+
+
+ files.append(file_details)
+
+ if self.most_recent_file:
+ # of files found in directory, find the one with the most recent edit timestamp
+ max_timestamp = None
+ chosen_file = []
+ for res in files:
+ # seem to be cases where search is out of date and returns items that don't exist
+ try:
+ item_info = sf_hook.item_info(res['item_id'])
+ except:
+ # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent
+ continue
+ # grab last modified, compare to current max known
+ item_last_modified = item_info['ProgenyEditDate']
+ if max_timestamp is None or item_last_modified > max_timestamp:
+ max_timestamp = item_last_modified
+ chosen_file = [res]
+ # overwrite files list with singular chosen item
+ files = chosen_file
+
+
+ # for all files, move to local
+ num_successes = 0
+
+ for file in files:
+
+ remote_file = os.path.join(file['file_path_ftp'], file['file_name'])
+ self.log.info("Attempting to get file " + remote_file)
+
+ # lower filename and replace spaces with underscores
+ file['file_name'] = file['file_name'].lower().replace(' ', '_')
+
+ # check to see if there is other metadata needed in local path and if not, add filename to local path
+ if file['parent_id'] == base_path_id:
+ full_local_path = os.path.join(self.local_path, file['file_name'])
+ else:
+ full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name'])
+
+ # create dir (works if there is a file name or not)
+ os.makedirs(os.path.dirname(full_local_path), exist_ok=True)
+
+ # download the file
+ try:
+ sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path)
+
+ if self.delete_remote:
+ sf_hook.delete(file['item_id'])
- continue
+ num_successes += 1
+
+ except Exception as err:
+ self.log.error(f'Failed to get file with message: {err}')
+
+ if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"):
+ slack.slack_alert_download_failure(
+ context=context, http_conn_id=slack_conn_id,
+ remote_path=remote_file, local_path=full_local_path, error=err
+ )
+
+ continue
- if num_successes == 0:
- raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!")
+ if num_successes == 0:
+ raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!")
- return self.local_path
+ return self.local_path