diff --git a/README.md b/README.md index 537f365..5c17e91 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,67 @@ Raise an error if a name-collision occurs after formatting. +## ea_csv +Helpers for working with csv's. + +
+See more: + +----- + +### txt_to_csv +Convert a txt file to a csv. + +Args: +- file_in (str): A path to a txt file. +- file_out (str): A path to a csv file. If 'None', then the input file path + is used. +- delimiter (str): A txt file delimiter. +- has_header (bool): If True, use the first row of the txt file as a column + header. If False, insert a column header using the column_names arg. + Default is True. +- column_names (list[str]): An ordered list of column names to use in the + output csv. If 'None' and has_header is False, insert an ordered, + integer column header (e.g. 1, 2, ..., n where n is the number of + columns). +- delete_txt (bool): If True, delete the input txt file. + +Returns: +- file_out (str): A csv file path. + +### txt_files_to_csv +Convert all txt files in a directory to csv files. Also works with a +single txt file path and can be optionally configured to process files in +all subdirectories. + +Args: +- path_in (str): A file or directory path containing zero or more txt files. +- path_out (str): A file or directory path to write csv file(s) to. If + 'None', then the input path is used. Note that a file will retain its + original names except with a .csv extension. +- delimiter (str): A txt file delimiter. Note that this function assumes + that all txt files in an input directory use the same delimiter. +- has_header (bool): If True, use the first row of the txt file(s) as a + column header. If False, insert a column header using the column_names + arg. Default is True. +- column_names (list[str]): An ordered list of column names to use in the + output csv(s). If 'None' and has_header is False, insert an ordered, + integer column header (e.g. 1, 2, ..., n where n is the number of + columns). +- delete_txt (bool): If True, delete all of the input txt files. +- include_subdirs (bool): If True, process all files in all subdirectories. + If False, only process files in the top level of the specified + directory. Default is False. + +Returns: +- path_out (str): A file or directory path containing the output csv + file(s). + +----- + +
+ + ## ftp FTP- and SFTP-utility helpers @@ -623,7 +684,146 @@ For example, `/ed-fi/apiClients/districts-2425-ds5/{tenant_code}/prod/Stadium` w +## SharefileToSnowflakeDag +`SharefileToSnowflakeDag` is an Airflow DAG that automates the process of +transferring files from ShareFile to Snowflake. The DAG downloads files from a +specified ShareFile location, applies an optional preprocessing operator (such +as converting txt files to csv), converts local csv's into JSONL format, uploads +the files to an S3 bucket, loads the data into a Snowflake database, and then +optionally deletes local files and/or moves processed files in Sharefile to a +configured location. + +
+Arguments: +----- + +| Argument | Description | +|-------------------------|--------------------------------------------------------------------------| +| sharefile_conn_id | A Sharefile connection ID. | +| local_base_path | A base local path for downloading files. | +| s3_conn_id | An Airflow connection ID for AWS S3. | +| s3_bucket | An S3 bucket where to stage files. | +| snowflake_conn_id | An Airflow connection ID for Snowflake. | +| snowflake_database | A Snowflake database name. | +| snowflake_schema | A Snowflake schema name. | +| delete_local | If True, delete local files after processing. | +| **kwargs | Additional arguments to pass to the Airflow DAG. | + +----- + +
+ +
+Methods: + +----- + +**build_task_group()** + +Builds a task group to load data from files in a Sharefile directory to a +Snowflake table. + +Note that the arguments specified here are relative to the class +arguments provided at instantiation. For example, the sharefile_path +argument is relative to the sharefile_conn_id specified at the class +level. + +| Argument | Description | +|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| group_id | A name for the Airflow task group. | +| sharefile_source_path | A Sharefile path to extract data from. | +| sharefile_processed_path | A Sharefile path to move files to after they have been processed. If None, do not move processed files. Default is None. | +| local_rel_path | A local relative path to stage data in with respect to the class's local_base_path. This is also used to determine the staging S3 destination relative to the class's S3 bucket. | +| snowflake_table | A Snowflake table name to write data to. | +| preprocessor | A function to preprocess the files before loading them into Snowflake. Default is None. | +| preprocessor_kwargs | A dictionary of keyword arguments to pass to the preprocessor function. Default is None. | +| custom_metadata | A mapping of metadata field names to values to include in the target Snowflake table. | +| full_refresh | If True, performs a full refresh load in Snowflake. Default is False. | +| csv_encoding | Optional encoding to use for csv files. Default is 'utf-8'. | +| **kwargs | Additional keyword arguments to pass to the task group. | + +----- + +
+ +
+Example Yaml Files: + +```yaml +# airflow_config.yml +default_args: &default_args + owner: + run_as_user: + depends_on_past: + start_date: + email: + email_on_failure: False + retries: 0 + trigger_rule: + retry_delay: + execution_timeout: + sla: + +### Sharefile to Snowflake DAGs +sharefile_to_snowflake_dags__default_args: &sharefile_to_snowflake_dags__default_args + sharefile_conn_id: + # Null here means processed files will not be moved in Sharefile + sharefile_processed_dir: ~ + + local_base_path: + delete_local: + + s3_conn_id: + s3_bucket: + + snowflake_conn_id: + snowflake_database: + snowflake_schema: + + airflow_default_args: *default_task_args + schedule_interval: null + +sharefile_to_snowflake_dags: + resource_1: + <<: *sharefile_to_snowflake_dags__default_args + sharefile_base_path: path/to/folder + resource_2: + <<: *sharefile_to_snowflake_dags__default_args + sharefile_base_path: path/to/folder + + +# sharefile_resources.yml +# Default args for txt_to_csv preprocessor +txt_to_csv__defaults: &txt_to_csv__defaults + 'path_out': None + 'delimiter': ^ + 'has_header': False + 'column_names': None + # S3 to Snowflake task breaks if non-jsonl file retained + 'delete_txt': True + 'include_subdirs': False + +# Default args for each file type task group +resource__defaults: &resource__defaults + csv_encoding: utf-8 + preprocessor: txt_files_to_csv + preprocessor_kwargs: + <<: *txt_to_csv__defaults + + +resources: + # The resource group name determines the Snowflake table to load data to + resource_group_1: + resource_1: + <<: *resource__defaults + + resource_group_2: + resource_2: + <<: *resource__defaults + csv_encoding: utf-8-sig +``` +
# Providers Finally, this package contains a handful of custom DBT operators to be used as an alternative to PythonOperators. diff --git a/ea_airflow_util/__init__.py b/ea_airflow_util/__init__.py index d0d6d5f..a1bff3c 100644 --- a/ea_airflow_util/__init__.py +++ b/ea_airflow_util/__init__.py @@ -8,6 +8,7 @@ from ea_airflow_util.dags.dbt_snapshot_dag import DbtSnapshotDag from ea_airflow_util.dags.sftp_to_snowflake_dag import SFTPToSnowflakeDag from ea_airflow_util.dags.sharefile_custom_users_dag import LoadSharefileCustomUsersDag +from ea_airflow_util.dags.sharefile_to_snowflake_dag import SharefileToSnowflakeDag from ea_airflow_util.callables.airflow import xcom_pull_template from ea_airflow_util.callables import slack as slack_callbacks diff --git a/ea_airflow_util/callables/disk.py b/ea_airflow_util/callables/disk.py new file mode 100644 index 0000000..e2ce796 --- /dev/null +++ b/ea_airflow_util/callables/disk.py @@ -0,0 +1,11 @@ +import os +import shutil + +def delete_from_disk(local_path: str): + """ + Delete a file or directory from the local filesystem. + """ + if os.path.isfile(local_path): + os.remove(local_path) + elif os.path.isdir(local_path): + shutil.rmtree(local_path) \ No newline at end of file diff --git a/ea_airflow_util/callables/ea_csv.py b/ea_airflow_util/callables/ea_csv.py new file mode 100644 index 0000000..7b29d9c --- /dev/null +++ b/ea_airflow_util/callables/ea_csv.py @@ -0,0 +1,128 @@ +import os +import pandas as pd + + +def txt_to_csv( + file_in, + file_out=None, + delimiter=',', + has_header=True, + column_names=None, + delete_txt=False +): + """Convert a txt file to a csv. + + Args: + - file_in (str): A path to a txt file. + - file_out (str): A path to a csv file. If 'None', then the input file path + is used. + - delimiter (str): A txt file delimiter. + - has_header (bool): If True, use the first row of the txt file as a column + header. If False, insert a column header using the column_names arg. + Default is True. + - column_names (list[str]): An ordered list of column names to use in the + output csv. If 'None' and has_header is False, insert an ordered, + integer column header (e.g. 1, 2, ..., n where n is the number of + columns). + - delete_txt (bool): If True, delete the input txt file. + + Returns: + - file_out (str): A csv file path. + """ + + if has_header == True: + # Force str dtype, otherwise pandas will do things like cast int's to + # floats + df = pd.read_csv(file_in, delimiter=delimiter, dtype=str) + + elif has_header == False and column_names != None: + df = pd.read_csv(file_in, delimiter=delimiter, dtype=str, header=None) + df.columns = column_names + + elif has_header == False and column_names == None: + df = pd.read_csv(file_in, delimiter=delimiter, dtype=str, header=None) + # 1-indexed column labels can simplify downstream processing + df.columns = df.columns + 1 + + if file_out == None: + file_out = file_in[:-4] + '.csv' + + df.to_csv(file_out, index=False) + + if delete_txt: + os.remove(file_in) + + return file_out + + +def txt_files_to_csv( + path_in, + path_out=None, + delimiter=',', + has_header=False, + column_names=None, + delete_txt=False, + include_subdirs=False +): + """Convert all txt files in a directory to csv files. Also works with a + single txt file path and can be optionally configured to process files in + all subdirectories. + + Args: + - path_in (str): A file or directory path containing zero or more txt files. + - path_out (str): A file or directory path to write csv file(s) to. If + 'None', then the input path is used. Note that a file will retain its + original names except with a .csv extension. + - delimiter (str): A txt file delimiter. Note that this function assumes + that all txt files in an input directory use the same delimiter. + - has_header (bool): If True, use the first row of the txt file(s) as a + column header. If False, insert a column header using the column_names + arg. Default is True. + - column_names (list[str]): An ordered list of column names to use in the + output csv(s). If 'None' and has_header is False, insert an ordered, + integer column header (e.g. 1, 2, ..., n where n is the number of + columns). + - delete_txt (bool): If True, delete all of the input txt files. + - include_subdirs (bool): If True, process all files in all subdirectories. + If False, only process files in the top level of the specified + directory. Default is False. + + Returns: + - path_out (str): A file or directory path containing the output csv + file(s). + """ + + for root, _, files in os.walk(path_in): + + for file in files: + + # Only process txt files + if file[-4:] != '.txt': + continue + + filepath_in = os.path.join(root, file) + + if path_out == None: + dir_out = root + else: + dir_out = path_out + + filename_out = file[:-4] + '.csv' + filepath_out = os.path.join(dir_out, filename_out) + + txt_to_csv( + file_in=filepath_in, + file_out=filepath_out, + delimiter=delimiter, + has_header=has_header, + column_names=column_names, + delete_txt=delete_txt + ) + + if include_subdirs == False: + break + + if path_out == None: + path_out = path_in + + return path_out diff --git a/ea_airflow_util/callables/jsonl.py b/ea_airflow_util/callables/jsonl.py index e02123a..8e5dd0e 100644 --- a/ea_airflow_util/callables/jsonl.py +++ b/ea_airflow_util/callables/jsonl.py @@ -60,6 +60,7 @@ def translate_csv_file_to_jsonl( delete_csv : bool = False, metadata_dict: Optional[dict] = None, to_snake_case: bool = False, + csv_encoding: str = 'utf-8', **kwargs ): """ @@ -95,7 +96,7 @@ def translate_csv_file_to_jsonl( output_path_new = output_path try: - with open(full_local_path, 'r') as reader: + with open(full_local_path, 'r', encoding=csv_encoding) as reader: json_records = csv.DictReader(reader) serialize_json_records_to_disk(json_records, output_path_new, "w", metadata_dict, to_snake_case, **kwargs) except UnicodeDecodeError: diff --git a/ea_airflow_util/dags/s3_to_snowflake_dag.py b/ea_airflow_util/dags/s3_to_snowflake_dag.py index a9e8c71..9513c58 100644 --- a/ea_airflow_util/dags/s3_to_snowflake_dag.py +++ b/ea_airflow_util/dags/s3_to_snowflake_dag.py @@ -36,7 +36,7 @@ def __init__(self, is_manual_upload: bool = False, pool: str, - full_replace: bool = False, #TODO once on latest version of airflow, use dagrun parameter to allow full_replace runs even if not set here at dag level + full_replace: bool = False, do_delete_from_source: bool = True, **kwargs diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag.py new file mode 100644 index 0000000..b2e661e --- /dev/null +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag.py @@ -0,0 +1,235 @@ +from airflow.operators.python import PythonOperator +from airflow.utils.task_group import TaskGroup + +from ea_airflow_util.callables import jsonl, s3, ea_csv, disk +from ea_airflow_util.callables.airflow import xcom_pull_template +from ea_airflow_util.callables.sharefile import sharefile_copy_file +from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator +from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator +from ea_airflow_util.dags.ea_custom_dag import EACustomDAG +from airflow.utils.helpers import chain + +from airflow.exceptions import AirflowException + + +class SharefileToSnowflakeDag: + """ + A class to build an Airflow DAG to extract data from flat files (csv and + txt) on ShareFile to tables in a Snowflake database schema. + + Parameters: + - sharefile_conn_id (str): A Sharefile connection ID. + - local_base_path (str): A base local path for downloading files. + - s3_conn_id (str): An Airflow connection ID for AWS S3. + - s3_bucket (str): An S3 bucket where to stage files. + - snowflake_conn_id (str): An Airflow connection ID for Snowflake. + - snowflake_database (str): A Snowflake database name. + - snowflake_schema (str): A Snowflake schema name. + - delete_local (bool): If True, delete the local files after processing. Default is False. + - **kwargs: Additional arguments to pass to the Airflow DAG. + """ + + def __init__( + self, + + sharefile_conn_id: str, + + local_base_path: str, + + s3_conn_id: str, + s3_bucket: str, + + snowflake_conn_id: str, + snowflake_database: str, + snowflake_schema: str, + + delete_local: bool = False, + + **kwargs + ) -> None: + self.sharefile_conn_id = sharefile_conn_id + + self.local_base_path = local_base_path + + self.s3_conn_id = s3_conn_id + self.s3_bucket = s3_bucket + + self.snowflake_conn_id = snowflake_conn_id + self.snowflake_database = snowflake_database + self.snowflake_schema = snowflake_schema + + self.dag = EACustomDAG(**kwargs) + + self.sharefile_task_groups = [] + + self.delete_local = delete_local + + # Only preprocessors that produce csv files are supported since the + # csv_to_jsonl operator expects csv files. + self.valid_preprocessors = {name: obj for name, obj in vars(ea_csv).items() if callable(obj)} + + def build_task_group( + self, + group_id, + sharefile_source_path, + local_rel_path, + snowflake_table, + sharefile_processed_path=None, + preprocessor=None, + preprocessor_kwargs=None, + custom_metadata={}, + full_refresh=False, + csv_encoding='utf-8', + **kwargs + ): + """Builds a task group to load data from csv and txt files in a + Sharefile directory to a Snowflake table. + + Note that the arguments specified here are relative to the class + arguments provided at instantiation. For example, the sharefile_path + argument is relative to the sharefile_conn_id specified at the class + level. + + Parameters: + - group_id (str): A name for the Airflow task group. + - sharefile_source_path (str): A Sharefile path to extract data from. + - sharefile_processed_path (str): A Sharefile path to move files to + after they have been processed. If None, do not move processed + files. Default is None. + - local_rel_path (str): A local relative path to stage data in with + respect to the class's local_base_path. This is also used to + determine the staging S3 destination relative to the class's S3 + bucket. + - snowflake_table (str): A Snowflake table name to write data to. + - preprocessor (callable): A function to preprocess the files before + loading them into Snowflake. Default is None. + - preprocessor_kwargs (dict): A dictionary of keyword arguments to pass + to the preprocessor function. Default is None. + - custom_metadata (dict): A mapping of metadata field names to values + to include in the target Snowflake table. + - full_refresh (bool): If True, performs a full refresh load in + Snowflake. Default is False. + - csv_encoding (str): Optional encoding to use for csv files. Default is + 'utf-8'. + - **kwargs: Additional keyword arguments to pass to the task group. + """ + # Define delete_local at the task group level to ensure that the + # delete_from_disk task is only included if delete_local is True. + delete_local = self.delete_local + + with TaskGroup(group_id=group_id, dag=self.dag, **kwargs) as task_group: + + sharefile_to_disk = SharefileToDiskOperator( + task_id=f"sharefile_to_disk", + sharefile_conn_id=self.sharefile_conn_id, + sharefile_path=sharefile_source_path, + local_path=f'{self.local_base_path}/{local_rel_path}', + delete_remote=False, + dag=self.dag + ) + + if preprocessor is not None and preprocessor_kwargs is not None: + + if preprocessor not in self.valid_preprocessors: + raise AirflowException(f"Invalid file preprocessor '{preprocessor}' provided. Valid preprocessors are: {list(self.valid_preprocessors.keys())}") + + # Create a copy of preprocessor_kwargs to avoid mutating the original dict + # and ensure proper template rendering in op_kwargs + op_kwargs = preprocessor_kwargs.copy() + op_kwargs['path_in'] = xcom_pull_template(sharefile_to_disk) + + preprocess_files = PythonOperator( + task_id=f"preprocess_files", + python_callable=self.valid_preprocessors[preprocessor], + op_kwargs=op_kwargs, + dag=self.dag + ) + else: + # If no preprocessor is provided, use a dummy operator that + # just passes the files through + preprocess_files = PythonOperator( + task_id=f"preprocess_files", + python_callable=lambda x: x, + op_kwargs={'x': xcom_pull_template(sharefile_to_disk)}, + dag=self.dag + ) + + csv_to_jsonl = PythonOperator( + task_id=f"csv_to_jsonl", + python_callable=jsonl.translate_csv_file_to_jsonl, + op_kwargs={ + 'local_path': xcom_pull_template(preprocess_files), + 'output_path': None, + # S3 to Snowflake task breaks if non-jsonl file retained + 'delete_csv': True, + 'csv_encoding': csv_encoding, + }, + dag=self.dag + ) + + disk_to_s3 = PythonOperator( + task_id=f"disk_to_s3", + python_callable=s3.disk_to_s3, + op_kwargs={ + 'local_path': xcom_pull_template(csv_to_jsonl), + 's3_conn_id': self.s3_conn_id, + 'bucket': self.s3_bucket, + 'base_dir': self.local_base_path, + 'delete_local': False, + }, + dag=self.dag + ) + + # Only define delete_from_disk task if delete_local is True. + # Otherwise, it will always be included as an Airflow task, even if + # delete_local is False. + if delete_local == True: + delete_from_disk = PythonOperator( + task_id=f"delete_from_disk", + python_callable=disk.delete_from_disk, + op_kwargs={ + 'local_path': xcom_pull_template(csv_to_jsonl), + }, + dag=self.dag + ) + + s3_to_snowflake = S3ToSnowflakeOperator( + task_id=f"s3_to_snowflake", + snowflake_conn_id=self.snowflake_conn_id, + database=self.snowflake_database, + schema=self.snowflake_schema, + table_name=snowflake_table, + custom_metadata_columns=custom_metadata, + s3_destination_key=xcom_pull_template(disk_to_s3), + full_refresh=full_refresh, + dag=self.dag + ) + + if sharefile_processed_path is not None: + # Move processed files to specific Sharefile location + move_to_processed = PythonOperator( + task_id=f'move_to_processed', + python_callable=sharefile_copy_file, + op_kwargs={ + 'sharefile_conn_id': self.sharefile_conn_id, + 'sharefile_path': sharefile_source_path, + 'sharefile_dest_dir': sharefile_processed_path, + 'delete_source': True, + }, + dag=self.dag + ) + + task_list = [] + task_list.append(sharefile_to_disk) + task_list.append(preprocess_files) + task_list.append(csv_to_jsonl) + task_list.append(disk_to_s3) + if self.delete_local == True: + task_list.append([s3_to_snowflake, delete_from_disk]) + else: + task_list.append(s3_to_snowflake) + if sharefile_processed_path is not None: + task_list.append(move_to_processed) + + chain(*task_list) + diff --git a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py index 70184b9..6b84afd 100644 --- a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py +++ b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py @@ -50,92 +50,127 @@ def execute(self, **context): sf_hook = SharefileHook(sharefile_conn_id=self.sharefile_conn_id) sf_hook.get_conn() - # get the item id of the remote path, find all files within that path (up to 1000) - base_path_id = sf_hook.get_path_id(self.sharefile_path) - remote_files = sf_hook.find_files(base_path_id) - - # check whether we found anything - if len(remote_files) == 0: - self.log.info("No files on FTP") - raise AirflowSkipException - - # extract relevant file details - files = [] - for res in remote_files: - file_details = { - 'file_name': res['FileName'], - 'size': res['Size'], - 'parent_id': res['ParentID'], - 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''), - 'file_path_ftp': res['ParentSemanticPath'], - 'item_id': res['ItemID'] - } - - - files.append(file_details) - - if self.most_recent_file: - # of files found in directory, find the one with the most recent edit timestamp - max_timestamp = None - chosen_file = [] - for res in files: - # seem to be cases where search is out of date and returns items that don't exist - try: - item_info = sf_hook.item_info(res['item_id']) - except: - # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent - continue - # grab last modified, compare to current max known - item_last_modified = item_info['ProgenyEditDate'] - if max_timestamp is None or item_last_modified > max_timestamp: - max_timestamp = item_last_modified - chosen_file = [res] - # overwrite files list with singular chosen item - files = chosen_file - - - # for all files, move to local - num_successes = 0 - - for file in files: + # get the item ID of the privided path (either file or folder) + item_id = sf_hook.get_path_id(self.sharefile_path) + if not item_id: + raise AirflowException(f"Cound not find item ID for path: {self.sharefile_path}") + + try: + item_info = sf_hook.item_info(item_id) + except Exception as e: + raise AirflowException(f"Failed to retrieve metadata for item {item_id}: {e}") + + # Case 1: single file + if item_info["odata.type"] == "ShareFile.Api.Models.File": + file_name = item_info['FileName'] + full_local_path = os.path.join(self.local_path, file_name) - remote_file = os.path.join(file['file_path_ftp'], file['file_name']) - self.log.info("Attempting to get file " + remote_file) - - # lower filename and replace spaces with underscores - file['file_name'] = file['file_name'].lower().replace(' ', '_') - - # check to see if there is other metadata needed in local path and if not, add filename to local path - if file['parent_id'] == base_path_id: - full_local_path = os.path.join(self.local_path, file['file_name']) - else: - full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name']) - - # create dir (works if there is a file name or not) os.makedirs(os.path.dirname(full_local_path), exist_ok=True) - # download the file try: - sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path) - + sf_hook.download_to_disk(item_id=item_id, local_path=full_local_path) if self.delete_remote: - sf_hook.delete(file['item_id']) - - num_successes += 1 - + sf_hook.delete(item_id) + + return self.local_path + except Exception as err: - self.log.error(f'Failed to get file with message: {err}') - + self.log.error(f"Failed to download file {self.sharefile_path}: {err}") if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"): slack.slack_alert_download_failure( context=context, http_conn_id=slack_conn_id, - remote_path=remote_file, local_path=full_local_path, error=err + remote_path=self.sharefile_path, local_path=full_local_path, error=err ) + raise AirflowException("Single file transfer failed") + + else: + + # get the item id of the remote path, find all files within that path (up to 1000) + base_path_id = item_id + remote_files = sf_hook.find_files(base_path_id) + + # check whether we found anything + if len(remote_files) == 0: + self.log.info("No files on FTP") + raise AirflowSkipException + + # extract relevant file details + files = [] + for res in remote_files: + file_details = { + 'file_name': res['FileName'], + 'size': res['Size'], + 'parent_id': res['ParentID'], + 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''), + 'file_path_ftp': res['ParentSemanticPath'], + 'item_id': res['ItemID'] + } + + + files.append(file_details) + + if self.most_recent_file: + # of files found in directory, find the one with the most recent edit timestamp + max_timestamp = None + chosen_file = [] + for res in files: + # seem to be cases where search is out of date and returns items that don't exist + try: + item_info = sf_hook.item_info(res['item_id']) + except: + # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent + continue + # grab last modified, compare to current max known + item_last_modified = item_info['ProgenyEditDate'] + if max_timestamp is None or item_last_modified > max_timestamp: + max_timestamp = item_last_modified + chosen_file = [res] + # overwrite files list with singular chosen item + files = chosen_file + + + # for all files, move to local + num_successes = 0 + + for file in files: + + remote_file = os.path.join(file['file_path_ftp'], file['file_name']) + self.log.info("Attempting to get file " + remote_file) + + # lower filename and replace spaces with underscores + file['file_name'] = file['file_name'].lower().replace(' ', '_') + + # check to see if there is other metadata needed in local path and if not, add filename to local path + if file['parent_id'] == base_path_id: + full_local_path = os.path.join(self.local_path, file['file_name']) + else: + full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name']) + + # create dir (works if there is a file name or not) + os.makedirs(os.path.dirname(full_local_path), exist_ok=True) + + # download the file + try: + sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path) + + if self.delete_remote: + sf_hook.delete(file['item_id']) - continue + num_successes += 1 + + except Exception as err: + self.log.error(f'Failed to get file with message: {err}') + + if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"): + slack.slack_alert_download_failure( + context=context, http_conn_id=slack_conn_id, + remote_path=remote_file, local_path=full_local_path, error=err + ) + + continue - if num_successes == 0: - raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!") + if num_successes == 0: + raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!") - return self.local_path + return self.local_path