From a5e1335299e3eb6b9171dd89ee3475d0aeb0e003 Mon Sep 17 00:00:00 2001 From: mhidas Date: Tue, 19 Jan 2021 11:28:57 +1100 Subject: [PATCH 1/8] update required version of aodncore --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 30f3ca25..bcd5d0ea 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,7 @@ INSTALL_REQUIRES = [ 'aodntools>=1.3.1', # installed before aodncore due to more specific jsonschema dependency - 'aodncore>=1.0.0', + 'aodncore>=1.2.0', 'cc-plugin-imos>=1.3.0', 'fiona>=1.8.8', 'fuzzywuzzy>=0.18.0', # most used python fuzzy search finder From aa6f014ff69b288e5a5ef7d297136c15e7483b02 Mon Sep 17 00:00:00 2001 From: Leigh Gordon Date: Mon, 18 Jan 2021 10:55:42 +1100 Subject: [PATCH 2/8] Update test.yml Add fail-fast: false --- .github/workflows/test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a4917f2d..8bf13aeb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,6 +14,7 @@ jobs: runs-on: ubuntu-latest strategy: + fail-fast: false matrix: python-version: [ '3.5', '3.6' ] stage: [ 'build', 'rc', 'production'] From 30a9f88b50f4add90dad38368e6fbcbeb3271a66 Mon Sep 17 00:00:00 2001 From: mhidas Date: Thu, 14 Jan 2021 17:14:43 +1100 Subject: [PATCH 3/8] update moorings handlers to use new aodncore API for remote file downloads and file_collection additions --- aodndata/moorings/handlers.py | 6 ++---- aodndata/moorings/products_handler.py | 14 +++++--------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/aodndata/moorings/handlers.py b/aodndata/moorings/handlers.py index 7639eb80..331d5a5d 100644 --- a/aodndata/moorings/handlers.py +++ b/aodndata/moorings/handlers.py @@ -59,9 +59,7 @@ def preprocess(self): self.logger.info("Burst-processing {f.name}".format(f=f)) product_path = create_burst_average_netcdf(f.src_path, self.products_dir) - product_file = PipelineFile(product_path, file_update_callback=self._file_update_callback) - product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD - self.file_collection.add(product_file) + self.add_to_collection(product_path, publish_type=PipelineFilePublishType.HARVEST_UPLOAD) def postprocess(self): """Set error_cleanup_regexes so that if the same file was uploaded previously and failed, it can now be @@ -116,6 +114,6 @@ def process(self): self.file_collection.set_publish_types(PipelineFilePublishType.NO_ACTION) self.input_file_object.publish_type = PipelineFilePublishType.HARVEST_UPLOAD - self.file_collection.add(self.input_file_object) + self.add_to_collection(self.input_file_object) dest_path = DwmFileClassifier.dest_path diff --git a/aodndata/moorings/products_handler.py b/aodndata/moorings/products_handler.py index 3acb167b..3d902ba0 100644 --- a/aodndata/moorings/products_handler.py +++ b/aodndata/moorings/products_handler.py @@ -201,7 +201,7 @@ def _get_input_files(self): # Download input files to local cache. self.logger.info("Downloading {n} input files".format(n=len(self.input_file_collection))) - self.input_file_collection.download(self._upload_store_runner.broker, self.temp_dir) + self.state_query.download(self.input_file_collection, self.temp_dir) # TODO: Replace temp_dir above with cache_dir? def _get_old_product_files(self): @@ -242,9 +242,7 @@ def _log_excluded_files(self, errors): def _add_to_collection(self, product_url): """Add a new product file to the file_collection to be harvested and uploaded.""" - product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback) - product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD - self.file_collection.add(product_file) + self.add_to_collection(product_url, publish_type=PipelineFilePublishType.HARVEST_UPLOAD) def _input_list_for_variables(self, *variables): """Return a list of input files containing any of the given variables""" @@ -295,7 +293,6 @@ def _make_hourly_timeseries(self): # create two versions of the product, one with only good data (flags 1 & 2), # and one also including non-QC'd data (flag 0) for qc_flags in ((1, 2), (0, 1, 2)): - product_url, errors = hourly_aggregator(input_list, self.product_site_code, qc_flags, **self.product_common_kwargs) @@ -357,10 +354,9 @@ def _cleanup_previous_version(self, product_filename): if os.path.basename(old_product_url) != product_filename: # Add the previous version as a "late deletion". It will be deleted during the handler's `publish` # step after (and only if) all new files have been successfully published. - old_file = PipelineFile(old_product_url, dest_path=old_product_url, is_deletion=True, - late_deletion=True, file_update_callback=self._file_update_callback) - old_file.publish_type = PipelineFilePublishType.DELETE_UNHARVEST - self.file_collection.add(old_file) + self.add_to_collection(old_product_url, dest_path=old_product_url, + is_deletion=True, late_deletion=True, + publish_type=PipelineFilePublishType.DELETE_UNHARVEST) def preprocess(self): """If the input is a manifest file, collect available input files and From bc7b7097208a798a32394ac6f7d494d1533f21ca Mon Sep 17 00:00:00 2001 From: mhidas Date: Tue, 19 Jan 2021 11:33:26 +1100 Subject: [PATCH 4/8] add parameter type annotation in MooringsProductsHandler to remove type warnings in PyCharm --- aodndata/moorings/products_handler.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aodndata/moorings/products_handler.py b/aodndata/moorings/products_handler.py index 3d902ba0..5c43890a 100644 --- a/aodndata/moorings/products_handler.py +++ b/aodndata/moorings/products_handler.py @@ -2,9 +2,11 @@ import os import re +from typing import Union, List + from owslib.fes import PropertyIsEqualTo, PropertyIsNotEqualTo, And, Or -from aodncore.pipeline import HandlerBase, PipelineFilePublishType, PipelineFile, FileType +from aodncore.pipeline import HandlerBase, PipelineFilePublishType, FileType from aodncore.pipeline.exceptions import (InvalidFileContentError, InvalidFileNameError, InvalidFileFormatError, MissingFileError, PipelineSystemError) from aodncore.pipeline.files import RemotePipelineFileCollection @@ -155,7 +157,7 @@ def product_common_kwargs(self): 'opendap_url_prefix': OPENDAP_URL_PREFIX } - def get_wfs_features(self, filter_list, propertyname='*'): + def get_wfs_features(self, filter_list, propertyname: Union[str, List[str]] = '*'): """Query the file index WFS layer with the given filters and return a list of features. :param filter_list: list of filters to apply (owslib.fes.OgcExpression instances) From b498a0aa13a070def24ae8b080d972998966d2f3 Mon Sep 17 00:00:00 2001 From: mhidas Date: Tue, 19 Jan 2021 16:37:46 +1100 Subject: [PATCH 5/8] use layer positional arg in query_wfs_getfeature_dict (introduced in https://github.com/aodn/python-aodncore/pull/228) --- aodndata/moorings/products_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aodndata/moorings/products_handler.py b/aodndata/moorings/products_handler.py index 5c43890a..3aa6f481 100644 --- a/aodndata/moorings/products_handler.py +++ b/aodndata/moorings/products_handler.py @@ -166,7 +166,7 @@ def get_wfs_features(self, filter_list, propertyname: Union[str, List[str]] = '* """ ogc_filter = ogc_filter_to_string(And(filter_list)) - wfs_response = self.state_query.query_wfs_getfeature_dict(typename=[self.FILE_INDEX_LAYER], + wfs_response = self.state_query.query_wfs_getfeature_dict(self.FILE_INDEX_LAYER, filter=ogc_filter, propertyname=propertyname ) From aa4fcbfee0ea8c07220a91ea0ae2e7e4b298e40b Mon Sep 17 00:00:00 2001 From: Leigh Gordon Date: Tue, 19 Jan 2021 17:08:28 +1100 Subject: [PATCH 6/8] Update soop_trv handler and tests to remove use of deprecated methods --- aodndata/soop/soop_trv.py | 16 ++++++---------- test_aodndata/soop/test_soop_trv.py | 2 +- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/aodndata/soop/soop_trv.py b/aodndata/soop/soop_trv.py index 4efde9c6..ca617e28 100644 --- a/aodndata/soop/soop_trv.py +++ b/aodndata/soop/soop_trv.py @@ -19,16 +19,12 @@ def preprocess(self): """ Files to be deleted as found in 'soop_trv_duplicate_url' wfs layer """ - files_to_delete = self.state_query.query_wfs_urls_for_layer('soop_trv_duplicate_url') - - for f in files_to_delete: - file_to_delete = PipelineFile(os.path.basename(f), - is_deletion=True, - dest_path=f, - file_update_callback=self._file_update_callback - ) - file_to_delete.publish_type = PipelineFilePublishType.DELETE_UNHARVEST - self.file_collection.add(file_to_delete) + duplicate_files = self.state_query.query_wfs_files('soop_trv_duplicate_url') + for duplicate in duplicate_files: + deletion = PipelineFile.from_remotepipelinefile(duplicate, + is_deletion=True, + publish_type=PipelineFilePublishType.DELETE_UNHARVEST) + self.add_to_collection(deletion) def get_main_soop_trv_var(self, filepath): with Dataset(filepath, mode='r') as netcdf_file_obj: diff --git a/test_aodndata/soop/test_soop_trv.py b/test_aodndata/soop/test_soop_trv.py index 7c54f2d4..eb985209 100644 --- a/test_aodndata/soop/test_soop_trv.py +++ b/test_aodndata/soop/test_soop_trv.py @@ -79,7 +79,7 @@ def test_delete_from_wfs(self, mock_webfeatureservice): broker = WfsBroker(self.config.pipeline_config['global']['wfs_url']) - files_for_layer = broker.query_urls_for_layer('soop_trv_duplicate_url') + files_for_layer = broker.query_files('soop_trv_duplicate_url') self.assertEqual(files_for_layer[0], 'IMOS/SOOP/SOOP-TRV/VMQ9273_Solander/By_Cruise/Cruise_START-20181205T035932Z_END-20181206T045722Z/temperature/IMOS_SOOP-TRV_T_20181205T035932Z_VMQ9273_FV01_END-20181206T045722Z.nc') From da4ebdd50e224cc4af83085d7ae8ef94d7302163 Mon Sep 17 00:00:00 2001 From: Leigh Gordon Date: Tue, 19 Jan 2021 17:09:14 +1100 Subject: [PATCH 7/8] Update aatams_sattag handler to remove use of private method --- aodndata/aatams/aatams_sattag.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/aodndata/aatams/aatams_sattag.py b/aodndata/aatams/aatams_sattag.py index 09c62642..8c708e02 100644 --- a/aodndata/aatams/aatams_sattag.py +++ b/aodndata/aatams/aatams_sattag.py @@ -54,14 +54,9 @@ def get_file(file_list, moniker, campaign=None): def get_remote_metadata_from_zip(self, remote_pfile): """Fetch the metdata.csv file from a RemotePipelineFile zip and wrapping it on a PipelineFile.""" - # TODO: remove the first try statement when pipeline remote_collection = RemotePipelineFileCollection(remote_pfile) - try: - download = self.state_query.download - except AttributeError: - download = self.state_query._storage_broker.download - download(remote_collection, self.temp_dir) + self.state_query.download(remote_collection, self.temp_dir) dest_folder = os.path.join( self.temp_dir, os.path.dirname(remote_pfile.dest_path) ) From 4b1e4dfdd1c15a198396e32715f1761dd8c5a3f9 Mon Sep 17 00:00:00 2001 From: mhidas Date: Thu, 21 Jan 2021 18:05:09 +1100 Subject: [PATCH 8/8] remove unnecessary creation of RemotePipelineFileCollection --- aodndata/aatams/aatams_sattag.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/aodndata/aatams/aatams_sattag.py b/aodndata/aatams/aatams_sattag.py index 8c708e02..15bf46e3 100644 --- a/aodndata/aatams/aatams_sattag.py +++ b/aodndata/aatams/aatams_sattag.py @@ -54,9 +54,7 @@ def get_file(file_list, moniker, campaign=None): def get_remote_metadata_from_zip(self, remote_pfile): """Fetch the metdata.csv file from a RemotePipelineFile zip and wrapping it on a PipelineFile.""" - remote_collection = RemotePipelineFileCollection(remote_pfile) - - self.state_query.download(remote_collection, self.temp_dir) + self.state_query.download(remote_pfile, self.temp_dir) dest_folder = os.path.join( self.temp_dir, os.path.dirname(remote_pfile.dest_path) )