Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ jobs:

runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: [ '3.5', '3.6' ]
stage: [ 'build', 'rc', 'production']
Expand Down
7 changes: 3 additions & 4 deletions aodndata/moorings/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ def preprocess(self):

self.logger.info("Burst-processing {f.name}".format(f=f))
product_path = create_burst_average_netcdf(f.src_path, self.products_dir)
product_file = PipelineFile(product_path, file_update_callback=self._file_update_callback)
product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
self.file_collection.add(product_file)
product_file = PipelineFile(product_path, publish_type=PipelineFilePublishType.HARVEST_UPLOAD)
self.add_pipelinefile(product_file)

def postprocess(self):
"""Set error_cleanup_regexes so that if the same file was uploaded previously and failed, it can now be
Expand Down Expand Up @@ -116,6 +115,6 @@ def process(self):

self.file_collection.set_publish_types(PipelineFilePublishType.NO_ACTION)
self.input_file_object.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
self.file_collection.add(self.input_file_object)
self.add_pipelinefile(self.input_file_object)

dest_path = DwmFileClassifier.dest_path
15 changes: 7 additions & 8 deletions aodndata/moorings/products_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def _get_input_files(self):

# Download input files to local cache.
self.logger.info("Downloading {n} input files".format(n=len(self.input_file_collection)))
self.input_file_collection.download(self._upload_store_runner.broker, self.temp_dir)
self.state_query.download(self.input_file_collection, self.temp_dir)
# TODO: Replace temp_dir above with cache_dir?

def _get_old_product_files(self):
Expand Down Expand Up @@ -242,9 +242,8 @@ def _log_excluded_files(self, errors):

def _add_to_collection(self, product_url):
"""Add a new product file to the file_collection to be harvested and uploaded."""
product_file = PipelineFile(product_url, file_update_callback=self._file_update_callback)
product_file.publish_type = PipelineFilePublishType.HARVEST_UPLOAD
self.file_collection.add(product_file)
product_file = PipelineFile(product_url, publish_type = PipelineFilePublishType.HARVEST_UPLOAD)
self.add_pipelinefile(product_file)

def _input_list_for_variables(self, *variables):
"""Return a list of input files containing any of the given variables"""
Expand Down Expand Up @@ -357,10 +356,10 @@ def _cleanup_previous_version(self, product_filename):
if os.path.basename(old_product_url) != product_filename:
# Add the previous version as a "late deletion". It will be deleted during the handler's `publish`
# step after (and only if) all new files have been successfully published.
old_file = PipelineFile(old_product_url, dest_path=old_product_url, is_deletion=True,
late_deletion=True, file_update_callback=self._file_update_callback)
old_file.publish_type = PipelineFilePublishType.DELETE_UNHARVEST
self.file_collection.add(old_file)
old_file = PipelineFile(old_product_url, dest_path=old_product_url,
is_deletion=True, late_deletion=True,
publish_type=PipelineFilePublishType.DELETE_UNHARVEST)
self.add_pipelinefile(old_file)

def preprocess(self):
"""If the input is a manifest file, collect available input files and
Expand Down