Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jobs:

runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: [ '3.5', '3.6' ]
stage: [ 'build', 'rc', 'production']
Expand Down
9 changes: 1 addition & 8 deletions aodndata/aatams/aatams_sattag.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,7 @@ def get_file(file_list, moniker, campaign=None):
def get_remote_metadata_from_zip(self, remote_pfile):
"""Fetch the metdata.csv file from a RemotePipelineFile zip and wrapping it on a
PipelineFile."""
# TODO: remove the first try statement when pipeline
remote_collection = RemotePipelineFileCollection(remote_pfile)
try:
download = self.state_query.download
except AttributeError:
download = self.state_query._storage_broker.download

download(remote_collection, self.temp_dir)
self.state_query.download(remote_pfile, self.temp_dir)
dest_folder = os.path.join(
self.temp_dir, os.path.dirname(remote_pfile.dest_path)
)
Expand Down
6 changes: 2 additions & 4 deletions aodndata/moorings/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ def preprocess(self):

self.logger.info("Burst-processing {f.name}".format(f=f))
product_path = create_burst_average_netcdf(f.src_path, self.products_dir)
product_file = PipelineFile(product_path, file_update_callback=self._file_update_callback)
product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
self.file_collection.add(product_file)
self.add_to_collection(product_path, publish_type=PipelineFilePublishType.HARVEST_UPLOAD)

def postprocess(self):
"""Set error_cleanup_regexes so that if the same file was uploaded previously and failed, it can now be
Expand Down Expand Up @@ -116,6 +114,6 @@ def process(self):

self.file_collection.set_publish_types(PipelineFilePublishType.NO_ACTION)
self.input_file_object.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
self.file_collection.add(self.input_file_object)
self.add_to_collection(self.input_file_object)

dest_path = DwmFileClassifier.dest_path
22 changes: 10 additions & 12 deletions aodndata/moorings/products_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import os
import re

from typing import Union, List

from owslib.fes import PropertyIsEqualTo, PropertyIsNotEqualTo, And, Or

from aodncore.pipeline import HandlerBase, PipelineFilePublishType, PipelineFile, FileType
from aodncore.pipeline import HandlerBase, PipelineFilePublishType, FileType
from aodncore.pipeline.exceptions import (InvalidFileContentError, InvalidFileNameError, InvalidFileFormatError,
MissingFileError, PipelineSystemError)
from aodncore.pipeline.files import RemotePipelineFileCollection
Expand Down Expand Up @@ -155,7 +157,7 @@ def product_common_kwargs(self):
'opendap_url_prefix': OPENDAP_URL_PREFIX
}

def get_wfs_features(self, filter_list, propertyname='*'):
def get_wfs_features(self, filter_list, propertyname: Union[str, List[str]] = '*'):
"""Query the file index WFS layer with the given filters and return a list of features.

:param filter_list: list of filters to apply (owslib.fes.OgcExpression instances)
Expand All @@ -164,7 +166,7 @@ def get_wfs_features(self, filter_list, propertyname='*'):
"""

ogc_filter = ogc_filter_to_string(And(filter_list))
wfs_response = self.state_query.query_wfs_getfeature_dict(typename=[self.FILE_INDEX_LAYER],
wfs_response = self.state_query.query_wfs_getfeature_dict(self.FILE_INDEX_LAYER,
filter=ogc_filter,
propertyname=propertyname
)
Expand Down Expand Up @@ -201,7 +203,7 @@ def _get_input_files(self):

# Download input files to local cache.
self.logger.info("Downloading {n} input files".format(n=len(self.input_file_collection)))
self.input_file_collection.download(self._upload_store_runner.broker, self.temp_dir)
self.state_query.download(self.input_file_collection, self.temp_dir)
# TODO: Replace temp_dir above with cache_dir?

def _get_old_product_files(self):
Expand Down Expand Up @@ -242,9 +244,7 @@ def _log_excluded_files(self, errors):

def _add_to_collection(self, product_url):
"""Add a new product file to the file_collection to be harvested and uploaded."""
product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback)
product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
self.file_collection.add(product_file)
self.add_to_collection(product_url, publish_type=PipelineFilePublishType.HARVEST_UPLOAD)

def _input_list_for_variables(self, *variables):
"""Return a list of input files containing any of the given variables"""
Expand Down Expand Up @@ -295,7 +295,6 @@ def _make_hourly_timeseries(self):
# create two versions of the product, one with only good data (flags 1 & 2),
# and one also including non-QC'd data (flag 0)
for qc_flags in ((1, 2), (0, 1, 2)):

product_url, errors = hourly_aggregator(input_list, self.product_site_code, qc_flags,
**self.product_common_kwargs)

Expand Down Expand Up @@ -357,10 +356,9 @@ def _cleanup_previous_version(self, product_filename):
if os.path.basename(old_product_url) != product_filename:
# Add the previous version as a "late deletion". It will be deleted during the handler's `publish`
# step after (and only if) all new files have been successfully published.
old_file = PipelineFile(old_product_url, dest_path=old_product_url, is_deletion=True,
late_deletion=True, file_update_callback=self._file_update_callback)
old_file.publish_type = PipelineFilePublishType.DELETE_UNHARVEST
self.file_collection.add(old_file)
self.add_to_collection(old_product_url, dest_path=old_product_url,
is_deletion=True, late_deletion=True,
publish_type=PipelineFilePublishType.DELETE_UNHARVEST)

def preprocess(self):
"""If the input is a manifest file, collect available input files and
Expand Down
16 changes: 6 additions & 10 deletions aodndata/soop/soop_trv.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,12 @@ def preprocess(self):
"""
Files to be deleted as found in 'soop_trv_duplicate_url' wfs layer
"""
files_to_delete = self.state_query.query_wfs_urls_for_layer('soop_trv_duplicate_url')

for f in files_to_delete:
file_to_delete = PipelineFile(os.path.basename(f),
is_deletion=True,
dest_path=f,
file_update_callback=self._file_update_callback
)
file_to_delete.publish_type = PipelineFilePublishType.DELETE_UNHARVEST
self.file_collection.add(file_to_delete)
duplicate_files = self.state_query.query_wfs_files('soop_trv_duplicate_url')
for duplicate in duplicate_files:
deletion = PipelineFile.from_remotepipelinefile(duplicate,
is_deletion=True,
publish_type=PipelineFilePublishType.DELETE_UNHARVEST)
self.add_to_collection(deletion)

def get_main_soop_trv_var(self, filepath):
with Dataset(filepath, mode='r') as netcdf_file_obj:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@

INSTALL_REQUIRES = [
'aodntools>=1.3.1', # installed before aodncore due to more specific jsonschema dependency
'aodncore>=1.0.0',
'aodncore>=1.2.0',
'cc-plugin-imos>=1.3.0',
'fiona>=1.8.8',
'fuzzywuzzy>=0.18.0', # most used python fuzzy search finder
Expand Down
2 changes: 1 addition & 1 deletion test_aodndata/soop/test_soop_trv.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_delete_from_wfs(self, mock_webfeatureservice):

broker = WfsBroker(self.config.pipeline_config['global']['wfs_url'])

files_for_layer = broker.query_urls_for_layer('soop_trv_duplicate_url')
files_for_layer = broker.query_files('soop_trv_duplicate_url')
self.assertEqual(files_for_layer[0],
'IMOS/SOOP/SOOP-TRV/VMQ9273_Solander/By_Cruise/Cruise_START-20181205T035932Z_END-20181206T045722Z/temperature/IMOS_SOOP-TRV_T_20181205T035932Z_VMQ9273_FV01_END-20181206T045722Z.nc')
Comment on lines +82 to 84
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lwgordonimos Should this test be creating a WfsBroker instead of using the new state_query api?
e.g.

Suggested change
files_for_layer = broker.query_files('soop_trv_duplicate_url')
self.assertEqual(files_for_layer[0],
'IMOS/SOOP/SOOP-TRV/VMQ9273_Solander/By_Cruise/Cruise_START-20181205T035932Z_END-20181206T045722Z/temperature/IMOS_SOOP-TRV_T_20181205T035932Z_VMQ9273_FV01_END-20181206T045722Z.nc')
handler = self.handler_class(GOOD_NC)
files_for_layer = handler.state_query.query_wfs_files('soop_trv_duplicate_url')
self.assertEqual(files_for_layer[0].dest_path,
'IMOS/SOOP/SOOP-TRV/VMQ9273_Solander/By_Cruise/Cruise_START-20181205T035932Z_END-20181206T045722Z/temperature/IMOS_SOOP-TRV_T_20181205T035932Z_VMQ9273_FV01_END-20181206T045722Z.nc')

In fact, this is already implicitly tested by test_good_netcdf above, so this one is kind of rendundant.
@lbesnard


Expand Down