diff --git a/README.md b/README.md index 537f365..81bff14 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,67 @@ Raise an error if a name-collision occurs after formatting. +## ea_csv +Helpers for working with csv's. + +
+See more: + +----- + +### txt_to_csv +Convert a txt file to a csv. + +Args: +- file_in (str): A path to a txt file. +- file_out (str): A path to a csv file. If 'None', then the input file path + is used. +- delimiter (str): A txt file delimiter. +- has_header (bool): If True, use the first row of the txt file as a column + header. If False, insert a column header using the column_names arg. + Default is True. +- column_names (list[str]): An ordered list of column names to use in the + output csv. If 'None' and has_header is False, insert an ordered, + integer column header (e.g. 1, 2, ..., n where n is the number of + columns). +- delete_txt (bool): If True, delete the input txt file. + +Returns: +- file_out (str): A csv file path. + +### txt_files_to_csv +Convert all txt files in a directory to csv files. Also works with a +single txt file path and can be optionally configured to process files in +all subdirectories. + +Args: +- path_in (str): A file or directory path containing zero or more txt files. +- path_out (str): A file or directory path to write csv file(s) to. If + 'None', then the input path is used. Note that a file will retain its + original names except with a .csv extension. +- delimiter (str): A txt file delimiter. Note that this function assumes + that all txt files in an input directory use the same delimiter. +- has_header (bool): If True, use the first row of the txt file(s) as a + column header. If False, insert a column header using the column_names + arg. Default is True. +- column_names (list[str]): An ordered list of column names to use in the + output csv(s). If 'None' and has_header is False, insert an ordered, + integer column header (e.g. 1, 2, ..., n where n is the number of + columns). +- delete_txt (bool): If True, delete all of the input txt files. +- include_subdirs (bool): If True, process all files in all subdirectories. + If False, only process files in the top level of the specified + directory. Default is False. + +Returns: +- path_out (str): A file or directory path containing the output csv + file(s). + +----- + +
+ + ## ftp FTP- and SFTP-utility helpers @@ -623,7 +684,113 @@ For example, `/ed-fi/apiClients/districts-2425-ds5/{tenant_code}/prod/Stadium` w +## SharefileToSnowflakeDag +`SharefileToSnowflakeDag` is an Airflow DAG that automates the process of +transferring files from ShareFile to Snowflake. The DAG retrieves txt and csv +files from a specified ShareFile location, transforms them into JSONL format, +uploads the files to an S3 bucket, and finally loads the data into a Snowflake +database. + +
+Arguments: +----- + +| Argument | Description | +|-------------------------|--------------------------------------------------------------------------| +| sharefile_conn_id | A Sharefile connection ID. | +| local_base_path | A base local path for downloading files. | +| s3_conn_id | An Airflow connection ID for AWS S3. | +| s3_bucket | An S3 bucket where to stage files. | +| snowflake_conn_id | An Airflow connection ID for Snowflake. | +| snowflake_database | A Snowflake database name. | +| snowflake_schema | A Snowflake schema name. | +| **kwargs | Additional arguments to pass to the Airflow DAG. | + +----- + +
+ +
+Methods: + +----- + +**build_task_group()** + +Builds a task group to load data from csv and txt files in a +Sharefile directory to a Snowflake table. + +Note that the arguments specified here are relative to the class +arguments provided at instantiation. For example, the sharefile_path +argument is relative to the sharefile_conn_id specified at the class +level. + +| Argument | Description | +|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| group_id | A name for the Airflow task group. | +| sharefile_source_path | A Sharefile path to extract data from. | +| sharefile_processed_path | A Sharefile path to move files to after they have been processed. If None, do not move processed files. Default is None. | +| local_rel_path | A local relative path to stage data in with respect to the class's local_base_path. This is also used to determine the staging S3 destination relative to the class's S3 bucket. | +| snowflake_table | A Snowflake table name to write data to. | +| txt_delimiter | A text delimiter used in the txt files to load. Default is ','. | +| txt_has_header | If True, uses the first row of the txt file as a column header. If False, inserts a column header based on the txt columns arg. Default is True. | +| txt_columns | An ordered list of column names in the txt files to load. If None and txt_has_header is False, then columns are labeled using integers (i.e. 1, 2, 3, ..., n, where n is the number of columns). Default is None. | +| custom_metadata | A mapping of metadata field names to values to include in the target Snowflake table. | +| full_refresh | If True, performs a full refresh load in Snowflake. Default is False. | +| csv_encoding | Optional encoding to use for csv files. Default is 'utf-8'. | +| **kwargs | Additional keyword arguments to pass to the task group. | + +----- + +
+ +
+Example Yaml File: + +```yaml +default_args: &default_args + owner: + run_as_user: + depends_on_past: + start_date: + email: + email_on_failure: False + retries: 0 + trigger_rule: + retry_delay: + execution_timeout: + sla: + +### Sharefile to Snowflake DAGs +sharefile_to_snowflake_dags__default_args: &sharefile_to_snowflake_dags__default_args + sharefile_conn_id: + # Null here means processed files will not be moved in Sharefile + sharefile_processed_dir: + + local_base_path: + + s3_conn_id: + s3_bucket: + + snowflake_conn_id: + snowflake_database: + snowflake_schema: + + airflow_default_args: *default_task_args + schedule_interval: null + +sharefile_to_snowflake_dags: + resource_1: + <<: *sharefile_to_snowflake_dags__default_args + sharefile_base_path: path/to/folder + snowflake_table: + resource_2: + <<: *sharefile_to_snowflake_dags__default_args + sharefile_base_path: path/to/folder + snowflake_table: +``` +
# Providers Finally, this package contains a handful of custom DBT operators to be used as an alternative to PythonOperators. diff --git a/ea_airflow_util/__init__.py b/ea_airflow_util/__init__.py index d0d6d5f..a1bff3c 100644 --- a/ea_airflow_util/__init__.py +++ b/ea_airflow_util/__init__.py @@ -8,6 +8,7 @@ from ea_airflow_util.dags.dbt_snapshot_dag import DbtSnapshotDag from ea_airflow_util.dags.sftp_to_snowflake_dag import SFTPToSnowflakeDag from ea_airflow_util.dags.sharefile_custom_users_dag import LoadSharefileCustomUsersDag +from ea_airflow_util.dags.sharefile_to_snowflake_dag import SharefileToSnowflakeDag from ea_airflow_util.callables.airflow import xcom_pull_template from ea_airflow_util.callables import slack as slack_callbacks diff --git a/ea_airflow_util/callables/ea_csv.py b/ea_airflow_util/callables/ea_csv.py new file mode 100644 index 0000000..7b29d9c --- /dev/null +++ b/ea_airflow_util/callables/ea_csv.py @@ -0,0 +1,128 @@ +import os +import pandas as pd + + +def txt_to_csv( + file_in, + file_out=None, + delimiter=',', + has_header=True, + column_names=None, + delete_txt=False +): + """Convert a txt file to a csv. + + Args: + - file_in (str): A path to a txt file. + - file_out (str): A path to a csv file. If 'None', then the input file path + is used. + - delimiter (str): A txt file delimiter. + - has_header (bool): If True, use the first row of the txt file as a column + header. If False, insert a column header using the column_names arg. + Default is True. + - column_names (list[str]): An ordered list of column names to use in the + output csv. If 'None' and has_header is False, insert an ordered, + integer column header (e.g. 1, 2, ..., n where n is the number of + columns). + - delete_txt (bool): If True, delete the input txt file. + + Returns: + - file_out (str): A csv file path. + """ + + if has_header == True: + # Force str dtype, otherwise pandas will do things like cast int's to + # floats + df = pd.read_csv(file_in, delimiter=delimiter, dtype=str) + + elif has_header == False and column_names != None: + df = pd.read_csv(file_in, delimiter=delimiter, dtype=str, header=None) + df.columns = column_names + + elif has_header == False and column_names == None: + df = pd.read_csv(file_in, delimiter=delimiter, dtype=str, header=None) + # 1-indexed column labels can simplify downstream processing + df.columns = df.columns + 1 + + if file_out == None: + file_out = file_in[:-4] + '.csv' + + df.to_csv(file_out, index=False) + + if delete_txt: + os.remove(file_in) + + return file_out + + +def txt_files_to_csv( + path_in, + path_out=None, + delimiter=',', + has_header=False, + column_names=None, + delete_txt=False, + include_subdirs=False +): + """Convert all txt files in a directory to csv files. Also works with a + single txt file path and can be optionally configured to process files in + all subdirectories. + + Args: + - path_in (str): A file or directory path containing zero or more txt files. + - path_out (str): A file or directory path to write csv file(s) to. If + 'None', then the input path is used. Note that a file will retain its + original names except with a .csv extension. + - delimiter (str): A txt file delimiter. Note that this function assumes + that all txt files in an input directory use the same delimiter. + - has_header (bool): If True, use the first row of the txt file(s) as a + column header. If False, insert a column header using the column_names + arg. Default is True. + - column_names (list[str]): An ordered list of column names to use in the + output csv(s). If 'None' and has_header is False, insert an ordered, + integer column header (e.g. 1, 2, ..., n where n is the number of + columns). + - delete_txt (bool): If True, delete all of the input txt files. + - include_subdirs (bool): If True, process all files in all subdirectories. + If False, only process files in the top level of the specified + directory. Default is False. + + Returns: + - path_out (str): A file or directory path containing the output csv + file(s). + """ + + for root, _, files in os.walk(path_in): + + for file in files: + + # Only process txt files + if file[-4:] != '.txt': + continue + + filepath_in = os.path.join(root, file) + + if path_out == None: + dir_out = root + else: + dir_out = path_out + + filename_out = file[:-4] + '.csv' + filepath_out = os.path.join(dir_out, filename_out) + + txt_to_csv( + file_in=filepath_in, + file_out=filepath_out, + delimiter=delimiter, + has_header=has_header, + column_names=column_names, + delete_txt=delete_txt + ) + + if include_subdirs == False: + break + + if path_out == None: + path_out = path_in + + return path_out diff --git a/ea_airflow_util/callables/jsonl.py b/ea_airflow_util/callables/jsonl.py index e02123a..8e5dd0e 100644 --- a/ea_airflow_util/callables/jsonl.py +++ b/ea_airflow_util/callables/jsonl.py @@ -60,6 +60,7 @@ def translate_csv_file_to_jsonl( delete_csv : bool = False, metadata_dict: Optional[dict] = None, to_snake_case: bool = False, + csv_encoding: str = 'utf-8', **kwargs ): """ @@ -95,7 +96,7 @@ def translate_csv_file_to_jsonl( output_path_new = output_path try: - with open(full_local_path, 'r') as reader: + with open(full_local_path, 'r', encoding=csv_encoding) as reader: json_records = csv.DictReader(reader) serialize_json_records_to_disk(json_records, output_path_new, "w", metadata_dict, to_snake_case, **kwargs) except UnicodeDecodeError: diff --git a/ea_airflow_util/dags/s3_to_snowflake_dag.py b/ea_airflow_util/dags/s3_to_snowflake_dag.py index a9e8c71..9513c58 100644 --- a/ea_airflow_util/dags/s3_to_snowflake_dag.py +++ b/ea_airflow_util/dags/s3_to_snowflake_dag.py @@ -36,7 +36,7 @@ def __init__(self, is_manual_upload: bool = False, pool: str, - full_replace: bool = False, #TODO once on latest version of airflow, use dagrun parameter to allow full_replace runs even if not set here at dag level + full_replace: bool = False, do_delete_from_source: bool = True, **kwargs diff --git a/ea_airflow_util/dags/sharefile_to_snowflake_dag.py b/ea_airflow_util/dags/sharefile_to_snowflake_dag.py new file mode 100644 index 0000000..f6513ff --- /dev/null +++ b/ea_airflow_util/dags/sharefile_to_snowflake_dag.py @@ -0,0 +1,208 @@ +from airflow.operators.python import PythonOperator +from airflow.utils.task_group import TaskGroup + +from ea_airflow_util.callables import jsonl, s3, ea_csv +from ea_airflow_util.callables.airflow import xcom_pull_template +from ea_airflow_util.callables.sharefile import sharefile_copy_file +from ea_airflow_util.providers.sharefile.transfers.sharefile_to_disk import SharefileToDiskOperator +from ea_airflow_util.providers.aws.operators.s3 import S3ToSnowflakeOperator +from ea_airflow_util.dags.ea_custom_dag import EACustomDAG + + +class SharefileToSnowflakeDag: + """ + A class to build an Airflow DAG to extract data from flat files (csv and + txt) on ShareFile to tables in a Snowflake database schema. + + Parameters: + - sharefile_conn_id (str): A Sharefile connection ID. + - local_base_path (str): A base local path for downloading files. + - s3_conn_id (str): An Airflow connection ID for AWS S3. + - s3_bucket (str): An S3 bucket where to stage files. + - snowflake_conn_id (str): An Airflow connection ID for Snowflake. + - snowflake_database (str): A Snowflake database name. + - snowflake_schema (str): A Snowflake schema name. + - **kwargs: Additional arguments to pass to the Airflow DAG. + """ + + def __init__( + self, + + sharefile_conn_id: str, + + local_base_path: str, + + s3_conn_id: str, + s3_bucket: str, + + snowflake_conn_id: str, + snowflake_database: str, + snowflake_schema: str, + + **kwargs + ) -> None: + self.sharefile_conn_id = sharefile_conn_id + + self.local_base_path = local_base_path + + self.s3_conn_id = s3_conn_id + self.s3_bucket = s3_bucket + + self.snowflake_conn_id = snowflake_conn_id + self.snowflake_database = snowflake_database + self.snowflake_schema = snowflake_schema + + self.dag = EACustomDAG(**kwargs) + + self.sharefile_task_groups = [] + + def build_task_group( + self, + group_id, + sharefile_source_path, + local_rel_path, + snowflake_table, + sharefile_processed_path=None, + txt_delimiter=',', + txt_has_header=True, + txt_columns=None, + custom_metadata={}, + full_refresh=False, + csv_encoding='utf-8', + **kwargs + ): + """Builds a task group to load data from csv and txt files in a + Sharefile directory to a Snowflake table. + + Note that the arguments specified here are relative to the class + arguments provided at instantiation. For example, the sharefile_path + argument is relative to the sharefile_conn_id specified at the class + level. + + Parameters: + - group_id (str): A name for the Airflow task group. + - sharefile_source_path (str): A Sharefile path to extract data from. + - sharefile_processed_path (str): A Sharefile path to move files to + after they have been processed. If None, do not move processed + files. Default is None. + - local_rel_path (str): A local relative path to stage data in with + respect to the class's local_base_path. This is also used to + determine the staging S3 destination relative to the class's S3 + bucket. + - snowflake_table (str): A Snowflake table name to write data to. + - txt_delimiter (str): A text delimiter used in the txt files to load. + Default is ','. + - txt_has_header (str): If True, uses the first row of the txt file as + a column header. If False, inserts a column header based on the txt + columns arg. Default is True. + - txt_columns (str): An ordered list of column names in the txt files + to load. If None and txt_has_header is False, then columns are + labeled using integers (i.e. 1, 2, 3, ..., n, where n is the number + of columns). Default is None. + - custom_metadata (dict): A mapping of metadata field names to values + to include in the target Snowflake table. + - full_refresh (bool): If True, performs a full refresh load in + Snowflake. Default is False. + - csv_encoding (str): Optional encoding to use for csv files. Default is + 'utf-8'. + - **kwargs: Additional keyword arguments to pass to the task group. + """ + + with TaskGroup(group_id=group_id, dag=self.dag, **kwargs) as task_group: + + sharefile_to_disk = SharefileToDiskOperator( + task_id=f"sharefile_to_disk", + sharefile_conn_id=self.sharefile_conn_id, + sharefile_path=sharefile_source_path, + local_path=f'{self.local_base_path}/{local_rel_path}', + delete_remote=False, + dag=self.dag + ) + + # TODO: Make this task a customizable preprocessing operator that + # gets passed to the build_task_group method + txt_to_csv = PythonOperator( + task_id=f"txt_to_csv", + python_callable=ea_csv.txt_files_to_csv, + op_kwargs={ + 'path_in': xcom_pull_template(sharefile_to_disk), + 'path_out': None, + 'delimiter': txt_delimiter, + 'has_header': txt_has_header, + 'column_names': txt_columns, + # S3 to Snowflake task breaks if non-jsonl file retained + 'delete_txt': True, + 'include_subdirs': False, + }, + dag=self.dag + ) + + csv_to_jsonl = PythonOperator( + task_id=f"csv_to_jsonl", + python_callable=jsonl.translate_csv_file_to_jsonl, + op_kwargs={ + 'local_path': xcom_pull_template(txt_to_csv), + 'output_path': None, + # S3 to Snowflake task breaks if non-jsonl file retained + 'delete_csv': True, + 'csv_encoding': csv_encoding, + }, + dag=self.dag + ) + + disk_to_s3 = PythonOperator( + task_id=f"disk_to_s3", + python_callable=s3.disk_to_s3, + op_kwargs={ + 'local_path': xcom_pull_template(csv_to_jsonl), + 's3_conn_id': self.s3_conn_id, + 'bucket': self.s3_bucket, + 'base_dir': self.local_base_path, + 'delete_local': False, + }, + dag=self.dag + ) + + s3_to_snowflake = S3ToSnowflakeOperator( + task_id=f"s3_to_snowflake", + snowflake_conn_id=self.snowflake_conn_id, + database=self.snowflake_database, + schema=self.snowflake_schema, + table_name=snowflake_table, + custom_metadata_columns=custom_metadata, + s3_destination_key=xcom_pull_template(disk_to_s3), + full_refresh=full_refresh, + dag=self.dag + ) + + if sharefile_processed_path is None: + ( + sharefile_to_disk + >> txt_to_csv + >> csv_to_jsonl + >> disk_to_s3 + >> s3_to_snowflake + ) + else: + # Move processed files to specific Sharefile location + move_to_processed = PythonOperator( + task_id=f'move_to_processed', + python_callable=sharefile_copy_file, + op_kwargs={ + 'sharefile_conn_id': self.sharefile_conn_id, + 'sharefile_path': sharefile_source_path, + 'sharefile_dest_dir': sharefile_processed_path, + 'delete_source': True, + }, + dag=self.dag + ) + ( + sharefile_to_disk + >> txt_to_csv + >> csv_to_jsonl + >> disk_to_s3 + >> s3_to_snowflake + >> move_to_processed + ) + + self.sharefile_task_groups.append(task_group) diff --git a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py index 70184b9..6b84afd 100644 --- a/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py +++ b/ea_airflow_util/providers/sharefile/transfers/sharefile_to_disk.py @@ -50,92 +50,127 @@ def execute(self, **context): sf_hook = SharefileHook(sharefile_conn_id=self.sharefile_conn_id) sf_hook.get_conn() - # get the item id of the remote path, find all files within that path (up to 1000) - base_path_id = sf_hook.get_path_id(self.sharefile_path) - remote_files = sf_hook.find_files(base_path_id) - - # check whether we found anything - if len(remote_files) == 0: - self.log.info("No files on FTP") - raise AirflowSkipException - - # extract relevant file details - files = [] - for res in remote_files: - file_details = { - 'file_name': res['FileName'], - 'size': res['Size'], - 'parent_id': res['ParentID'], - 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''), - 'file_path_ftp': res['ParentSemanticPath'], - 'item_id': res['ItemID'] - } - - - files.append(file_details) - - if self.most_recent_file: - # of files found in directory, find the one with the most recent edit timestamp - max_timestamp = None - chosen_file = [] - for res in files: - # seem to be cases where search is out of date and returns items that don't exist - try: - item_info = sf_hook.item_info(res['item_id']) - except: - # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent - continue - # grab last modified, compare to current max known - item_last_modified = item_info['ProgenyEditDate'] - if max_timestamp is None or item_last_modified > max_timestamp: - max_timestamp = item_last_modified - chosen_file = [res] - # overwrite files list with singular chosen item - files = chosen_file - - - # for all files, move to local - num_successes = 0 - - for file in files: + # get the item ID of the privided path (either file or folder) + item_id = sf_hook.get_path_id(self.sharefile_path) + if not item_id: + raise AirflowException(f"Cound not find item ID for path: {self.sharefile_path}") + + try: + item_info = sf_hook.item_info(item_id) + except Exception as e: + raise AirflowException(f"Failed to retrieve metadata for item {item_id}: {e}") + + # Case 1: single file + if item_info["odata.type"] == "ShareFile.Api.Models.File": + file_name = item_info['FileName'] + full_local_path = os.path.join(self.local_path, file_name) - remote_file = os.path.join(file['file_path_ftp'], file['file_name']) - self.log.info("Attempting to get file " + remote_file) - - # lower filename and replace spaces with underscores - file['file_name'] = file['file_name'].lower().replace(' ', '_') - - # check to see if there is other metadata needed in local path and if not, add filename to local path - if file['parent_id'] == base_path_id: - full_local_path = os.path.join(self.local_path, file['file_name']) - else: - full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name']) - - # create dir (works if there is a file name or not) os.makedirs(os.path.dirname(full_local_path), exist_ok=True) - # download the file try: - sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path) - + sf_hook.download_to_disk(item_id=item_id, local_path=full_local_path) if self.delete_remote: - sf_hook.delete(file['item_id']) - - num_successes += 1 - + sf_hook.delete(item_id) + + return self.local_path + except Exception as err: - self.log.error(f'Failed to get file with message: {err}') - + self.log.error(f"Failed to download file {self.sharefile_path}: {err}") if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"): slack.slack_alert_download_failure( context=context, http_conn_id=slack_conn_id, - remote_path=remote_file, local_path=full_local_path, error=err + remote_path=self.sharefile_path, local_path=full_local_path, error=err ) + raise AirflowException("Single file transfer failed") + + else: + + # get the item id of the remote path, find all files within that path (up to 1000) + base_path_id = item_id + remote_files = sf_hook.find_files(base_path_id) + + # check whether we found anything + if len(remote_files) == 0: + self.log.info("No files on FTP") + raise AirflowSkipException + + # extract relevant file details + files = [] + for res in remote_files: + file_details = { + 'file_name': res['FileName'], + 'size': res['Size'], + 'parent_id': res['ParentID'], + 'file_path_no_base': res['ParentSemanticPath'].replace(self.sharefile_path, ''), + 'file_path_ftp': res['ParentSemanticPath'], + 'item_id': res['ItemID'] + } + + + files.append(file_details) + + if self.most_recent_file: + # of files found in directory, find the one with the most recent edit timestamp + max_timestamp = None + chosen_file = [] + for res in files: + # seem to be cases where search is out of date and returns items that don't exist + try: + item_info = sf_hook.item_info(res['item_id']) + except: + # if the item fails to fetch item info, it probably doesn't exist, so can't be most recent + continue + # grab last modified, compare to current max known + item_last_modified = item_info['ProgenyEditDate'] + if max_timestamp is None or item_last_modified > max_timestamp: + max_timestamp = item_last_modified + chosen_file = [res] + # overwrite files list with singular chosen item + files = chosen_file + + + # for all files, move to local + num_successes = 0 + + for file in files: + + remote_file = os.path.join(file['file_path_ftp'], file['file_name']) + self.log.info("Attempting to get file " + remote_file) + + # lower filename and replace spaces with underscores + file['file_name'] = file['file_name'].lower().replace(' ', '_') + + # check to see if there is other metadata needed in local path and if not, add filename to local path + if file['parent_id'] == base_path_id: + full_local_path = os.path.join(self.local_path, file['file_name']) + else: + full_local_path = os.path.join(self.local_path, file['file_path_no_base'], file['file_name']) + + # create dir (works if there is a file name or not) + os.makedirs(os.path.dirname(full_local_path), exist_ok=True) + + # download the file + try: + sf_hook.download_to_disk(item_id=file['item_id'], local_path=full_local_path) + + if self.delete_remote: + sf_hook.delete(file['item_id']) - continue + num_successes += 1 + + except Exception as err: + self.log.error(f'Failed to get file with message: {err}') + + if slack_conn_id := context["dag"].user_defined_macros.get("slack_conn_id"): + slack.slack_alert_download_failure( + context=context, http_conn_id=slack_conn_id, + remote_path=remote_file, local_path=full_local_path, error=err + ) + + continue - if num_successes == 0: - raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!") + if num_successes == 0: + raise AirflowException("Failed transfer from ShareFile to local: no files transferred successfully!") - return self.local_path + return self.local_path