diff --git a/README.md b/README.md
index 537f365..81bff14 100644
--- a/README.md
+++ b/README.md
@@ -58,6 +58,67 @@ Raise an error if a name-collision occurs after formatting.
+## ea_csv
+Helpers for working with csv's.
+
+
+See more:
+
+-----
+
+### txt_to_csv
+Convert a txt file to a csv.
+
+Args:
+- file_in (str): A path to a txt file.
+- file_out (str): A path to a csv file. If 'None', then the input file path
+ is used.
+- delimiter (str): A txt file delimiter.
+- has_header (bool): If True, use the first row of the txt file as a column
+ header. If False, insert a column header using the column_names arg.
+ Default is True.
+- column_names (list[str]): An ordered list of column names to use in the
+ output csv. If 'None' and has_header is False, insert an ordered,
+ integer column header (e.g. 1, 2, ..., n where n is the number of
+ columns).
+- delete_txt (bool): If True, delete the input txt file.
+
+Returns:
+- file_out (str): A csv file path.
+
+### txt_files_to_csv
+Convert all txt files in a directory to csv files. Also works with a
+single txt file path and can be optionally configured to process files in
+all subdirectories.
+
+Args:
+- path_in (str): A file or directory path containing zero or more txt files.
+- path_out (str): A file or directory path to write csv file(s) to. If
+ 'None', then the input path is used. Note that a file will retain its
+ original names except with a .csv extension.
+- delimiter (str): A txt file delimiter. Note that this function assumes
+ that all txt files in an input directory use the same delimiter.
+- has_header (bool): If True, use the first row of the txt file(s) as a
+ column header. If False, insert a column header using the column_names
+ arg. Default is True.
+- column_names (list[str]): An ordered list of column names to use in the
+ output csv(s). If 'None' and has_header is False, insert an ordered,
+ integer column header (e.g. 1, 2, ..., n where n is the number of
+ columns).
+- delete_txt (bool): If True, delete all of the input txt files.
+- include_subdirs (bool): If True, process all files in all subdirectories.
+ If False, only process files in the top level of the specified
+ directory. Default is False.
+
+Returns:
+- path_out (str): A file or directory path containing the output csv
+ file(s).
+
+-----
+
+
+
+
## ftp
FTP- and SFTP-utility helpers
@@ -623,7 +684,113 @@ For example, `/ed-fi/apiClients/districts-2425-ds5/{tenant_code}/prod/Stadium` w
+## SharefileToSnowflakeDag
+`SharefileToSnowflakeDag` is an Airflow DAG that automates the process of
+transferring files from ShareFile to Snowflake. The DAG retrieves txt and csv
+files from a specified ShareFile location, transforms them into JSONL format,
+uploads the files to an S3 bucket, and finally loads the data into a Snowflake
+database.
+
+
+Arguments:
+-----
+
+| Argument | Description |
+|-------------------------|--------------------------------------------------------------------------|
+| sharefile_conn_id | A Sharefile connection ID. |
+| local_base_path | A base local path for downloading files. |
+| s3_conn_id | An Airflow connection ID for AWS S3. |
+| s3_bucket | An S3 bucket where to stage files. |
+| snowflake_conn_id | An Airflow connection ID for Snowflake. |
+| snowflake_database | A Snowflake database name. |
+| snowflake_schema | A Snowflake schema name. |
+| **kwargs | Additional arguments to pass to the Airflow DAG. |
+
+-----
+
+
+
+
+Methods:
+
+-----
+
+**build_task_group()**
+
+Builds a task group to load data from csv and txt files in a
+Sharefile directory to a Snowflake table.
+
+Note that the arguments specified here are relative to the class
+arguments provided at instantiation. For example, the sharefile_path
+argument is relative to the sharefile_conn_id specified at the class
+level.
+
+| Argument | Description |
+|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| group_id | A name for the Airflow task group. |
+| sharefile_source_path | A Sharefile path to extract data from. |
+| sharefile_processed_path | A Sharefile path to move files to after they have been processed. If None, do not move processed files. Default is None. |
+| local_rel_path | A local relative path to stage data in with respect to the class's local_base_path. This is also used to determine the staging S3 destination relative to the class's S3 bucket. |
+| snowflake_table | A Snowflake table name to write data to. |
+| txt_delimiter | A text delimiter used in the txt files to load. Default is ','. |
+| txt_has_header | If True, uses the first row of the txt file as a column header. If False, inserts a column header based on the txt columns arg. Default is True. |
+| txt_columns | An ordered list of column names in the txt files to load. If None and txt_has_header is False, then columns are labeled using integers (i.e. 1, 2, 3, ..., n, where n is the number of columns). Default is None. |
+| custom_metadata | A mapping of metadata field names to values to include in the target Snowflake table. |
+| full_refresh | If True, performs a full refresh load in Snowflake. Default is False. |
+| csv_encoding | Optional encoding to use for csv files. Default is 'utf-8'. |
+| **kwargs | Additional keyword arguments to pass to the task group. |
+
+-----
+
+
+
+
+Example Yaml File:
+
+```yaml
+default_args: &default_args
+ owner:
+ run_as_user:
+ depends_on_past:
+ start_date:
+ email:
+ email_on_failure: False
+ retries: 0
+ trigger_rule:
+ retry_delay:
+ execution_timeout:
+ sla:
+
+### Sharefile to Snowflake DAGs
+sharefile_to_snowflake_dags__default_args: &sharefile_to_snowflake_dags__default_args
+ sharefile_conn_id:
+ # Null here means processed files will not be moved in Sharefile
+ sharefile_processed_dir:
+
+ local_base_path:
+
+ s3_conn_id:
+ s3_bucket:
+
+ snowflake_conn_id:
+ snowflake_database:
+ snowflake_schema:
+
+ airflow_default_args: *default_task_args
+ schedule_interval: null
+
+sharefile_to_snowflake_dags:
+ resource_1:
+ <<: *sharefile_to_snowflake_dags__default_args
+ sharefile_base_path: path/to/folder
+ snowflake_table:
+ resource_2:
+ <<: *sharefile_to_snowflake_dags__default_args
+ sharefile_base_path: path/to/folder
+ snowflake_table:
+```
+
# Providers
Finally, this package contains a handful of custom DBT operators to be used as an alternative to PythonOperators.
diff --git a/ea_airflow_util/__init__.py b/ea_airflow_util/__init__.py
index d0d6d5f..a1bff3c 100644
--- a/ea_airflow_util/__init__.py
+++ b/ea_airflow_util/__init__.py
@@ -8,6 +8,7 @@
from ea_airflow_util.dags.dbt_snapshot_dag import DbtSnapshotDag
from ea_airflow_util.dags.sftp_to_snowflake_dag import SFTPToSnowflakeDag
from ea_airflow_util.dags.sharefile_custom_users_dag import LoadSharefileCustomUsersDag
+from ea_airflow_util.dags.sharefile_to_snowflake_dag import SharefileToSnowflakeDag
from ea_airflow_util.callables.airflow import xcom_pull_template
from ea_airflow_util.callables import slack as slack_callbacks
diff --git a/ea_airflow_util/callables/ea_csv.py b/ea_airflow_util/callables/ea_csv.py
new file mode 100644
index 0000000..7b29d9c
--- /dev/null
+++ b/ea_airflow_util/callables/ea_csv.py
@@ -0,0 +1,128 @@
+import os
+import pandas as pd
+
+
+def txt_to_csv(
+ file_in,
+ file_out=None,
+ delimiter=',',
+ has_header=True,
+ column_names=None,
+ delete_txt=False
+):
+ """Convert a txt file to a csv.
+
+ Args:
+ - file_in (str): A path to a txt file.
+ - file_out (str): A path to a csv file. If 'None', then the input file path
+ is used.
+ - delimiter (str): A txt file delimiter.
+ - has_header (bool): If True, use the first row of the txt file as a column
+ header. If False, insert a column header using the column_names arg.
+ Default is True.
+ - column_names (list[str]): An ordered list of column names to use in the
+ output csv. If 'None' and has_header is False, insert an ordered,
+ integer column header (e.g. 1, 2, ..., n where n is the number of
+ columns).
+ - delete_txt (bool): If True, delete the input txt file.
+
+ Returns:
+ - file_out (str): A csv file path.
+ """
+
+ if has_header == True:
+ # Force str dtype, otherwise pandas will do things like cast int's to
+ # floats
+ df = pd.read_csv(file_in, delimiter=delimiter, dtype=str)
+
+ elif has_header == False and column_names != None:
+ df = pd.read_csv(file_in, delimiter=delimiter, dtype=str, header=None)
+ df.columns = column_names
+
+ elif has_header == False and column_names == None:
+ df = pd.read_csv(file_in, delimiter=delimiter, dtype=str, header=None)
+ # 1-indexed column labels can simplify downstream processing
+ df.columns = df.columns + 1
+
+ if file_out == None:
+ file_out = file_in[:-4] + '.csv'
+
+ df.to_csv(file_out, index=False)
+
+ if delete_txt:
+ os.remove(file_in)
+
+ return file_out
+
+
+def txt_files_to_csv(
+ path_in,
+ path_out=None,
+ delimiter=',',
+ has_header=False,
+ column_names=None,
+ delete_txt=False,
+ include_subdirs=False
+):
+ """Convert all txt files in a directory to csv files. Also works with a
+ single txt file path and can be optionally configured to process files in
+ all subdirectories.
+
+ Args:
+ - path_in (str): A file or directory path containing zero or more txt files.
+ - path_out (str): A file or directory path to write csv file(s) to. If
+ 'None', then the input path is used. Note that a file will retain its
+ original names except with a .csv extension.
+ - delimiter (str): A txt file delimiter. Note that this function assumes
+ that all txt files in an input directory use the same delimiter.
+ - has_header (bool): If True, use the first row of the txt file(s) as a
+ column header. If False, insert a column header using the column_names
+ arg. Default is True.
+ - column_names (list[str]): An ordered list of column names to use in the
+ output csv(s). If 'None' and has_header is False, insert an ordered,
+ integer column header (e.g. 1, 2, ..., n where n is the number of
+ columns).
+ - delete_txt (bool): If True, delete all of the input txt files.
+ - include_subdirs (bool): If True, process all files in all subdirectories.
+ If False, only process files in the top level of the specified
+ directory. Default is False.
+
+ Returns:
+ - path_out (str): A file or directory path containing the output csv
+ file(s).
+ """
+
+ for root, _, files in os.walk(path_in):
+
+ for file in files:
+
+ # Only process txt files
+ if file[-4:] != '.txt':
+ continue
+
+ filepath_in = os.path.join(root, file)
+
+ if path_out == None:
+ dir_out = root
+ else:
+ dir_out = path_out
+
+ filename_out = file[:-4] + '.csv'
+ filepath_out = os.path.join(dir_out, filename_out)
+
+ txt_to_csv(
+ file_in=filepath_in,
+ file_out=filepath_out,
+ delimiter=delimiter,
+ has_header=has_header,
+ column_names=column_names,
+ delete_txt=delete_txt
+ )
+
+ if include_subdirs == False:
+ break
+
+ if path_out == None:
+ path_out = path_in
+
+ return path_out
diff --git a/ea_airflow_util/callables/jsonl.py b/ea_airflow_util/callables/jsonl.py
index e02123a..8e5dd0e 100644
--- a/ea_airflow_util/callables/jsonl.py
+++ b/ea_airflow_util/callables/jsonl.py
@@ -60,6 +60,7 @@ def translate_csv_file_to_jsonl(
delete_csv : bool = False,
metadata_dict: Optional[dict] = None,
to_snake_case: bool = False,
+ csv_encoding: str = 'utf-8',
**kwargs
):
"""
@@ -95,7 +96,7 @@ def translate_csv_file_to_jsonl(
output_path_new = output_path
try:
- with open(full_local_path, 'r') as reader:
+ with open(full_local_path, 'r', encoding=csv_encoding) as reader:
json_records = csv.DictReader(reader)
serialize_json_records_to_disk(json_records, output_path_new, "w", metadata_dict, to_snake_case, **kwargs)
except UnicodeDecodeError:
diff --git a/ea_airflow_util/dags/s3_to_snowflake_dag.py b/ea_airflow_util/dags/s3_to_snowflake_dag.py
index a9e8c71..9513c58 100644
--- a/ea_airflow_util/dags/s3_to_snowflake_dag.py
+++ b/ea_airflow_util/dags/s3_to_snowflake_dag.py
@@ -36,7 +36,7 @@ def __init__(self,
is_manual_upload: bool = False,
pool: str,
- full_replace: bool = False, #TODO once on latest version of airflow, use dagrun parameter to allow full_replace runs even if not set here at dag level
+ full_replace: bool = False,
do_delete_from_source: bool = True,
**kwargs
diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag.py
new file mode 100644
index 0000000..f6513ff
--- /dev/null
+++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag.py
@@ -0,0 +1,208 @@
+from airflow.operators.python import PythonOperator
+from airflow.utils.task_group import TaskGroup
+
+from ea_airflow_util.callables import jsonl, s3, ea_csv
+from ea_airflow_util.callables.airflow import xcom_pull_template
+from ea_airflow_util.callables.sharefile import sharefile_copy_file
+from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator
+from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator
+from ea_airflow_util.dags.ea_custom_dag import EACustomDAG
+
+
+class SharefileToSnowflakeDag:
+ """
+ A class to build an Airflow DAG to extract data from flat files (csv and
+ txt) on ShareFile to tables in a Snowflake database schema.
+
+ Parameters:
+ - sharefile_conn_id (str): A Sharefile connection ID.
+ - local_base_path (str): A base local path for downloading files.
+ - s3_conn_id (str): An Airflow connection ID for AWS S3.
+ - s3_bucket (str): An S3 bucket where to stage files.
+ - snowflake_conn_id (str): An Airflow connection ID for Snowflake.
+ - snowflake_database (str): A Snowflake database name.
+ - snowflake_schema (str): A Snowflake schema name.
+ - **kwargs: Additional arguments to pass to the Airflow DAG.
+ """
+
+ def __init__(
+ self,
+
+ sharefile_conn_id: str,
+
+ local_base_path: str,
+
+ s3_conn_id: str,
+ s3_bucket: str,
+
+ snowflake_conn_id: str,
+ snowflake_database: str,
+ snowflake_schema: str,
+
+ **kwargs
+ ) -> None:
+ self.sharefile_conn_id = sharefile_conn_id
+
+ self.local_base_path = local_base_path
+
+ self.s3_conn_id = s3_conn_id
+ self.s3_bucket = s3_bucket
+
+ self.snowflake_conn_id = snowflake_conn_id
+ self.snowflake_database = snowflake_database
+ self.snowflake_schema = snowflake_schema
+
+ self.dag = EACustomDAG(**kwargs)
+
+ self.sharefile_task_groups = []
+
+ def build_task_group(
+ self,
+ group_id,
+ sharefile_source_path,
+ local_rel_path,
+ snowflake_table,
+ sharefile_processed_path=None,
+ txt_delimiter=',',
+ txt_has_header=True,
+ txt_columns=None,
+ custom_metadata={},
+ full_refresh=False,
+ csv_encoding='utf-8',
+ **kwargs
+ ):
+ """Builds a task group to load data from csv and txt files in a
+ Sharefile directory to a Snowflake table.
+
+ Note that the arguments specified here are relative to the class
+ arguments provided at instantiation. For example, the sharefile_path
+ argument is relative to the sharefile_conn_id specified at the class
+ level.
+
+ Parameters:
+ - group_id (str): A name for the Airflow task group.
+ - sharefile_source_path (str): A Sharefile path to extract data from.
+ - sharefile_processed_path (str): A Sharefile path to move files to
+ after they have been processed. If None, do not move processed
+ files. Default is None.
+ - local_rel_path (str): A local relative path to stage data in with
+ respect to the class's local_base_path. This is also used to
+ determine the staging S3 destination relative to the class's S3
+ bucket.
+ - snowflake_table (str): A Snowflake table name to write data to.
+ - txt_delimiter (str): A text delimiter used in the txt files to load.
+ Default is ','.
+ - txt_has_header (str): If True, uses the first row of the txt file as
+ a column header. If False, inserts a column header based on the txt
+ columns arg. Default is True.
+ - txt_columns (str): An ordered list of column names in the txt files
+ to load. If None and txt_has_header is False, then columns are
+ labeled using integers (i.e. 1, 2, 3, ..., n, where n is the number
+ of columns). Default is None.
+ - custom_metadata (dict): A mapping of metadata field names to values
+ to include in the target Snowflake table.
+ - full_refresh (bool): If True, performs a full refresh load in
+ Snowflake. Default is False.
+ - csv_encoding (str): Optional encoding to use for csv files. Default is
+ 'utf-8'.
+ - **kwargs: Additional keyword arguments to pass to the task group.
+ """
+
+ with TaskGroup(group_id=group_id, dag=self.dag, **kwargs) as task_group:
+
+ sharefile_to_disk = SharefileToDiskOperator(
+ task_id=f"sharefile_to_disk",
+ sharefile_conn_id=self.sharefile_conn_id,
+ sharefile_path=sharefile_source_path,
+ local_path=f'{self.local_base_path}/{local_rel_path}',
+ delete_remote=False,
+ dag=self.dag
+ )
+
+ # TODO: Make this task a customizable preprocessing operator that
+ # gets passed to the build_task_group method
+ txt_to_csv = PythonOperator(
+ task_id=f"txt_to_csv",
+ python_callable=ea_csv.txt_files_to_csv,
+ op_kwargs={
+ 'path_in': xcom_pull_template(sharefile_to_disk),
+ 'path_out': None,
+ 'delimiter': txt_delimiter,
+ 'has_header': txt_has_header,
+ 'column_names': txt_columns,
+ # S3 to Snowflake task breaks if non-jsonl file retained
+ 'delete_txt': True,
+ 'include_subdirs': False,
+ },
+ dag=self.dag
+ )
+
+ csv_to_jsonl = PythonOperator(
+ task_id=f"csv_to_jsonl",
+ python_callable=jsonl.translate_csv_file_to_jsonl,
+ op_kwargs={
+ 'local_path': xcom_pull_template(txt_to_csv),
+ 'output_path': None,
+ # S3 to Snowflake task breaks if non-jsonl file retained
+ 'delete_csv': True,
+ 'csv_encoding': csv_encoding,
+ },
+ dag=self.dag
+ )
+
+ disk_to_s3 = PythonOperator(
+ task_id=f"disk_to_s3",
+ python_callable=s3.disk_to_s3,
+ op_kwargs={
+ 'local_path': xcom_pull_template(csv_to_jsonl),
+ 's3_conn_id': self.s3_conn_id,
+ 'bucket': self.s3_bucket,
+ 'base_dir': self.local_base_path,
+ 'delete_local': False,
+ },
+ dag=self.dag
+ )
+
+ s3_to_snowflake = S3ToSnowflakeOperator(
+ task_id=f"s3_to_snowflake",
+ snowflake_conn_id=self.snowflake_conn_id,
+ database=self.snowflake_database,
+ schema=self.snowflake_schema,
+ table_name=snowflake_table,
+ custom_metadata_columns=custom_metadata,
+ s3_destination_key=xcom_pull_template(disk_to_s3),
+ full_refresh=full_refresh,
+ dag=self.dag
+ )
+
+ if sharefile_processed_path is None:
+ (
+ sharefile_to_disk
+ >> txt_to_csv
+ >> csv_to_jsonl
+ >> disk_to_s3
+ >> s3_to_snowflake
+ )
+ else:
+ # Move processed files to specific Sharefile location
+ move_to_processed = PythonOperator(
+ task_id=f'move_to_processed',
+ python_callable=sharefile_copy_file,
+ op_kwargs={
+ 'sharefile_conn_id': self.sharefile_conn_id,
+ 'sharefile_path': sharefile_source_path,
+ 'sharefile_dest_dir': sharefile_processed_path,
+ 'delete_source': True,
+ },
+ dag=self.dag
+ )
+ (
+ sharefile_to_disk
+ >> txt_to_csv
+ >> csv_to_jsonl
+ >> disk_to_s3
+ >> s3_to_snowflake
+ >> move_to_processed
+ )
+
+ self.sharefile_task_groups.append(task_group)
diff --git a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py
index 70184b9..6b84afd 100644
--- a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py
+++ b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py
@@ -50,92 +50,127 @@ def execute(self, **context):
sf_hook = SharefileHook(sharefile_conn_id=self.sharefile_conn_id)
sf_hook.get_conn()
- # get the item id of the remote path, find all files within that path (up to 1000)
- base_path_id = sf_hook.get_path_id(self.sharefile_path)
- remote_files = sf_hook.find_files(base_path_id)
-
- # check whether we found anything
- if len(remote_files) == 0:
- self.log.info("No files on FTP")
- raise AirflowSkipException
-
- # extract relevant file details
- files = []
- for res in remote_files:
- file_details = {
- 'file_name': res['FileName'],
- 'size': res['Size'],
- 'parent_id': res['ParentID'],
- 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''),
- 'file_path_ftp': res['ParentSemanticPath'],
- 'item_id': res['ItemID']
- }
-
-
- files.append(file_details)
-
- if self.most_recent_file:
- # of files found in directory, find the one with the most recent edit timestamp
- max_timestamp = None
- chosen_file = []
- for res in files:
- # seem to be cases where search is out of date and returns items that don't exist
- try:
- item_info = sf_hook.item_info(res['item_id'])
- except:
- # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent
- continue
- # grab last modified, compare to current max known
- item_last_modified = item_info['ProgenyEditDate']
- if max_timestamp is None or item_last_modified > max_timestamp:
- max_timestamp = item_last_modified
- chosen_file = [res]
- # overwrite files list with singular chosen item
- files = chosen_file
-
-
- # for all files, move to local
- num_successes = 0
-
- for file in files:
+ # get the item ID of the privided path (either file or folder)
+ item_id = sf_hook.get_path_id(self.sharefile_path)
+ if not item_id:
+ raise AirflowException(f"Cound not find item ID for path: {self.sharefile_path}")
+
+ try:
+ item_info = sf_hook.item_info(item_id)
+ except Exception as e:
+ raise AirflowException(f"Failed to retrieve metadata for item {item_id}: {e}")
+
+ # Case 1: single file
+ if item_info["odata.type"] == "ShareFile.Api.Models.File":
+ file_name = item_info['FileName']
+ full_local_path = os.path.join(self.local_path, file_name)
- remote_file = os.path.join(file['file_path_ftp'], file['file_name'])
- self.log.info("Attempting to get file " + remote_file)
-
- # lower filename and replace spaces with underscores
- file['file_name'] = file['file_name'].lower().replace(' ', '_')
-
- # check to see if there is other metadata needed in local path and if not, add filename to local path
- if file['parent_id'] == base_path_id:
- full_local_path = os.path.join(self.local_path, file['file_name'])
- else:
- full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name'])
-
- # create dir (works if there is a file name or not)
os.makedirs(os.path.dirname(full_local_path), exist_ok=True)
- # download the file
try:
- sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path)
-
+ sf_hook.download_to_disk(item_id=item_id, local_path=full_local_path)
if self.delete_remote:
- sf_hook.delete(file['item_id'])
-
- num_successes += 1
-
+ sf_hook.delete(item_id)
+
+ return self.local_path
+
except Exception as err:
- self.log.error(f'Failed to get file with message: {err}')
-
+ self.log.error(f"Failed to download file {self.sharefile_path}: {err}")
if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"):
slack.slack_alert_download_failure(
context=context, http_conn_id=slack_conn_id,
- remote_path=remote_file, local_path=full_local_path, error=err
+ remote_path=self.sharefile_path, local_path=full_local_path, error=err
)
+ raise AirflowException("Single file transfer failed")
+
+ else:
+
+ # get the item id of the remote path, find all files within that path (up to 1000)
+ base_path_id = item_id
+ remote_files = sf_hook.find_files(base_path_id)
+
+ # check whether we found anything
+ if len(remote_files) == 0:
+ self.log.info("No files on FTP")
+ raise AirflowSkipException
+
+ # extract relevant file details
+ files = []
+ for res in remote_files:
+ file_details = {
+ 'file_name': res['FileName'],
+ 'size': res['Size'],
+ 'parent_id': res['ParentID'],
+ 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''),
+ 'file_path_ftp': res['ParentSemanticPath'],
+ 'item_id': res['ItemID']
+ }
+
+
+ files.append(file_details)
+
+ if self.most_recent_file:
+ # of files found in directory, find the one with the most recent edit timestamp
+ max_timestamp = None
+ chosen_file = []
+ for res in files:
+ # seem to be cases where search is out of date and returns items that don't exist
+ try:
+ item_info = sf_hook.item_info(res['item_id'])
+ except:
+ # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent
+ continue
+ # grab last modified, compare to current max known
+ item_last_modified = item_info['ProgenyEditDate']
+ if max_timestamp is None or item_last_modified > max_timestamp:
+ max_timestamp = item_last_modified
+ chosen_file = [res]
+ # overwrite files list with singular chosen item
+ files = chosen_file
+
+
+ # for all files, move to local
+ num_successes = 0
+
+ for file in files:
+
+ remote_file = os.path.join(file['file_path_ftp'], file['file_name'])
+ self.log.info("Attempting to get file " + remote_file)
+
+ # lower filename and replace spaces with underscores
+ file['file_name'] = file['file_name'].lower().replace(' ', '_')
+
+ # check to see if there is other metadata needed in local path and if not, add filename to local path
+ if file['parent_id'] == base_path_id:
+ full_local_path = os.path.join(self.local_path, file['file_name'])
+ else:
+ full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name'])
+
+ # create dir (works if there is a file name or not)
+ os.makedirs(os.path.dirname(full_local_path), exist_ok=True)
+
+ # download the file
+ try:
+ sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path)
+
+ if self.delete_remote:
+ sf_hook.delete(file['item_id'])
- continue
+ num_successes += 1
+
+ except Exception as err:
+ self.log.error(f'Failed to get file with message: {err}')
+
+ if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"):
+ slack.slack_alert_download_failure(
+ context=context, http_conn_id=slack_conn_id,
+ remote_path=remote_file, local_path=full_local_path, error=err
+ )
+
+ continue
- if num_successes == 0:
- raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!")
+ if num_successes == 0:
+ raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!")
- return self.local_path
+ return self.local_path