Skip to content
Open
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
data
sample_data

# local environment overrides — see dev.env.example for the schema
dev.env

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
6 changes: 0 additions & 6 deletions dev.env

This file was deleted.

24 changes: 24 additions & 0 deletions dev.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copy this file to dev.env and fill in the secrets locally.
# dev.env is gitignored; never commit real keys.

INPUT_DIR=/data/input
OUTPUT_DIR=/data/output
ENVIRONMENT=local
PENNSIEVE_API_HOST=https://api.pennsieve.net
PENNSIEVE_API_HOST2=https://api2.pennsieve.net
IMPORTER_ENABLED=true

# Pennsieve API credentials (generated in the Pennsieve UI under Personal API Tokens).
PENNSIEVE_API_KEY=
PENNSIEVE_API_SECRET=

# Workflow instance / integration UUID. Leave blank for ad-hoc local runs;
# a UUID is generated on the fly. Set to a real id when triggered by the runner.
INTEGRATION_ID=

# Per-converter pipeline name (mef-asset, edf-asset, etc.) recorded
# on viewer_asset rows. Each deployment overrides this.
ASSET_NAME=mef-asset

# Set true to fall back to the pre-viewer-asset upload flow.
LEGACY_IMPORT_FLOW=false
109 changes: 109 additions & 0 deletions processor/asset_uploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""S3 chunk uploader using STS credentials returned by packages-service.

The credentials are short-lived (~1 hour). For very long uploads,
this will need a refresh mechanism (see packages-service POST
/assets/{id}/upload-credentials, not yet implemented).
"""

import logging
import os
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass

import boto3
import botocore.exceptions
from clients.packages_assets_client import UploadCredentials

log = logging.getLogger()


@dataclass
class ChunkUploadResult:
"""Per-file upload outcome — used downstream to register ranges."""

relative_key: str # the chunk's path relative to the asset's prefix
full_key: str # bucket-relative S3 key (key_prefix + relative_key)


class AssetUploader:
"""Uploads files to a viewer-asset's S3 prefix using STS credentials."""

def __init__(self, credentials: UploadCredentials, max_workers: int = 4):
self._creds = credentials
self._max_workers = max_workers

def _build_client(self):
"""Create a fresh boto3 S3 client bound to the STS session."""
return boto3.client(
"s3",
aws_access_key_id=self._creds.access_key_id,
aws_secret_access_key=self._creds.secret_access_key,
aws_session_token=self._creds.session_token,
region_name=self._creds.region,
)

def upload_files(
self,
files: list[tuple[str, str]],
) -> list[ChunkUploadResult]:
"""Upload local files in parallel.

Args:
files: list of (local_path, relative_key) tuples. The relative
key is the path UNDER the asset's prefix; this method
prepends key_prefix when forming the full S3 key.

Returns:
list of ChunkUploadResult in input order.

Raises:
botocore.exceptions.ClientError on any failure (including
ExpiredToken). No partial-success aggregation: if any file
fails, the exception propagates and the orchestrator should
treat the asset as failed.
"""
s3 = self._build_client()
results: list[ChunkUploadResult] = [None] * len(files) # type: ignore[list-item]

def _put(index: int, local_path: str, relative_key: str) -> None:
full_key = self._creds.key_prefix + relative_key
try:
s3.upload_file(local_path, self._creds.bucket, full_key)
except botocore.exceptions.ClientError as e:
log.error(
"failed to upload %s to s3://%s/%s: %s",
local_path,
self._creds.bucket,
full_key,
e,
)
raise
results[index] = ChunkUploadResult(relative_key=relative_key, full_key=full_key)

with ThreadPoolExecutor(max_workers=self._max_workers) as executor:
futures = [executor.submit(_put, i, local, relative) for i, (local, relative) in enumerate(files)]
# Wait for all and surface the first exception.
for future in futures:
future.result()

log.info(
"uploaded %d files to s3://%s/%s",
len(files),
self._creds.bucket,
self._creds.key_prefix,
)
return results


def relative_key_for_chunk(chunk_filename: str) -> str:
"""Map a chunk file in the local output dir to its S3 relative key.

Today's writer produces files named:
channel-{idx:05d}_{start_us}_{end_us}.bin.gz
or after channel-id substitution in importer:
{channel_node_id}_{start_us}_{end_us}.bin.gz

The relative key is just the basename — no nested directories.
timeseries-service validates this is a "safe relative path".
"""
return os.path.basename(chunk_filename)
7 changes: 7 additions & 0 deletions processor/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@
from .base_client import SessionManager as SessionManager
from .import_client import ImportClient as ImportClient
from .import_client import ImportFile as ImportFile
from .packages_assets_client import CreatedAsset as CreatedAsset
from .packages_assets_client import PackagesAssetsClient as PackagesAssetsClient
from .packages_assets_client import UploadCredentials as UploadCredentials
from .packages_assets_client import ViewerAsset as ViewerAsset
from .packages_client import PackagesClient as PackagesClient
from .timeseries_client import TimeSeriesClient as TimeSeriesClient
from .timeseries_ranges_client import CreateRangesResult as CreateRangesResult
from .timeseries_ranges_client import RangeChunk as RangeChunk
from .timeseries_ranges_client import TimeSeriesRangesClient as TimeSeriesRangesClient
from .workflow_client import WorkflowClient as WorkflowClient
from .workflow_client import WorkflowInstance as WorkflowInstance
4 changes: 1 addition & 3 deletions processor/clients/authentication_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,5 @@ def refresh(self) -> str:
self._session_token = self._cognito.refresh_token(self._refresh_token, self._session_token)
else:
log.info("no refresh token, re-authenticating with API key/secret")
self._session_token, self._refresh_token = self._cognito.authenticate(
self._api_key, self._api_secret
)
self._session_token, self._refresh_token = self._cognito.authenticate(self._api_key, self._api_secret)
return self._session_token
17 changes: 17 additions & 0 deletions processor/clients/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@
log = logging.getLogger()


# (connect_timeout_seconds, read_timeout_seconds)
DEFAULT_TIMEOUT = (5, 30)


def _is_client_error(exc: requests.RequestException) -> bool:
"""Tells the backoff library when to give up.

Give up on 4xx HTTP responses (the request itself is wrong; a retry
won't change the answer). Keep retrying everything else: 5xx, plus
connection-level errors with no response at all (timeouts, refused
connections, DNS failures).
"""
if isinstance(exc, requests.HTTPError) and exc.response is not None:
return 400 <= exc.response.status_code < 500
return False


# encapsulates a shared API session and token refresh
class SessionManager:
def __init__(self, auth_provider):
Expand Down
Loading
Loading