Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ The processor generates two types of files per channel:
| `OUTPUT_DIR` | Directory for output files | - |
| `CHUNK_SIZE_MB` | Size of each data chunk in MB | `1` |
| `IMPORTER_ENABLED` | Enable Pennsieve upload | `false` |
| `PENNSIEVE_API_KEY` | Pennsieve API key | - |
| `PENNSIEVE_API_SECRET` | Pennsieve API secret | - |
| `SESSION_TOKEN` | Pennsieve session token | - |
| `REFRESH_TOKEN` | Pennsieve refresh token | - |
| `PENNSIEVE_API_HOST` | Pennsieve API endpoint | `https://api.pennsieve.net` |
| `PENNSIEVE_API_HOST2` | Pennsieve API2 endpoint | `https://api2.pennsieve.net` |
| `INTEGRATION_ID` | Workflow instance ID | - |
Expand Down
4 changes: 3 additions & 1 deletion processor/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from .authentication_client import AuthenticationClient as AuthenticationClient
from .authentication_client import AuthProvider as AuthProvider
from .authentication_client import KeySecretAuthProvider as KeySecretAuthProvider
from .authentication_client import TokenAuthProvider as TokenAuthProvider
from .base_client import BaseClient as BaseClient
from .base_client import SessionManager as SessionManager
from .import_client import ImportClient as ImportClient
Expand Down
172 changes: 142 additions & 30 deletions processor/clients/authentication_client.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,160 @@
import base64
import json
import logging
from abc import ABC, abstractmethod

import boto3
import requests

log = logging.getLogger()


class AuthenticationClient:
class AuthProvider(ABC):
"""Interface for authentication strategies.

All auth methods ultimately produce a session token and the ability to
refresh it. Implementations differ only in how they bootstrap.
"""

@abstractmethod
def get_session_token(self) -> str:
"""Return the current session token."""
...

@abstractmethod
def refresh(self) -> str:
"""Refresh and return a new session token."""
...


class CognitoClient:
"""Shared Cognito interaction logic used by all auth providers."""

def __init__(self, api_host):
self.api_host = api_host
self._cognito_config = None

def _get_cognito_config(self):
if self._cognito_config is not None:
return self._cognito_config

def authenticate(self, api_key, api_secret):
url = f"{self.api_host}/authentication/cognito-config"
response = requests.get(url)
response.raise_for_status()
data = json.loads(response.content)

try:
response = requests.get(url)
response.raise_for_status()
data = json.loads(response.content)
self._cognito_config = {
"app_client_id": data["userPool"]["appClientId"],
"region": data["region"],
}
return self._cognito_config

cognito_app_client_id = data["tokenPool"]["appClientId"]
cognito_region = data["region"]
def _get_idp_client(self):
config = self._get_cognito_config()
return boto3.client(
"cognito-idp",
region_name=config["region"],
aws_access_key_id="",
aws_secret_access_key="",
)

cognito_idp_client = boto3.client(
"cognito-idp",
region_name=cognito_region,
aws_access_key_id="",
aws_secret_access_key="",
)
def authenticate(self, api_key, api_secret):
"""Exchange API key/secret for session + refresh tokens via Cognito USER_PASSWORD_AUTH."""
config = self._get_cognito_config()
idp_client = self._get_idp_client()

login_response = cognito_idp_client.initiate_auth(
AuthFlow="USER_PASSWORD_AUTH",
AuthParameters={"USERNAME": api_key, "PASSWORD": api_secret},
ClientId=cognito_app_client_id,
)
login_response = idp_client.initiate_auth(
AuthFlow="USER_PASSWORD_AUTH",
AuthParameters={"USERNAME": api_key, "PASSWORD": api_secret},
ClientId=config["app_client_id"],
)

auth_result = login_response["AuthenticationResult"]
return auth_result["AccessToken"], auth_result["RefreshToken"]

@staticmethod
def _decode_token(token):
"""Decode a JWT payload without verification (for extracting claims like device_key)."""
payload = token.split(".")[1]
# JWT base64url encoding may lack padding
padding = 4 - len(payload) % 4
if padding != 4:
payload += "=" * padding
return json.loads(base64.urlsafe_b64decode(payload))

def refresh_token(self, refresh_token, session_token=None):
"""Use a refresh token to obtain a new access token via Cognito REFRESH_TOKEN_AUTH."""
config = self._get_cognito_config()
idp_client = self._get_idp_client()

auth_parameters = {"REFRESH_TOKEN": refresh_token}

device_key = None
if session_token:
try:
decoded = self._decode_token(session_token)
device_key = decoded.get("device_key")
if device_key:
log.info(f"extracted device_key from session token: {device_key}")
except Exception as e:
log.warning(f"failed to extract device_key from session token: {e}")

access_token = login_response["AuthenticationResult"]["AccessToken"]
return access_token
except requests.HTTPError as e:
log.error(f"failed to reach authentication server with error: {e}")
raise e
except json.JSONDecodeError as e:
log.error(f"failed to decode authentication response with error: {e}")
raise e
except Exception as e:
log.error(f"failed to authenticate with error: {e}")
raise e
if device_key:
auth_parameters["DEVICE_KEY"] = device_key

response = idp_client.initiate_auth(
AuthFlow="REFRESH_TOKEN_AUTH",
AuthParameters=auth_parameters,
ClientId=config["app_client_id"],
)

return response["AuthenticationResult"]["AccessToken"]


class TokenAuthProvider(AuthProvider):
"""Auth provider for pre-supplied session + refresh tokens (production path)."""

def __init__(self, api_host, session_token, refresh_token):
self._session_token = session_token
self._refresh_token = refresh_token
self._cognito = CognitoClient(api_host)

def get_session_token(self) -> str:
return self._session_token

def refresh(self) -> str:
if not self._refresh_token:
raise RuntimeError("cannot refresh session: no refresh token available")
log.info("refreshing session token using refresh token")
self._session_token = self._cognito.refresh_token(self._refresh_token, self._session_token)
return self._session_token


class KeySecretAuthProvider(AuthProvider):
"""Auth provider that authenticates with API key/secret (local development path).

Authenticates eagerly on construction to obtain session + refresh tokens,
then refreshes using the same Cognito refresh flow as TokenAuthProvider.
"""

def __init__(self, api_host, api_key, api_secret):
self._api_key = api_key
self._api_secret = api_secret
self._cognito = CognitoClient(api_host)

log.info("authenticating with API key/secret")
self._session_token, self._refresh_token = self._cognito.authenticate(api_key, api_secret)

def get_session_token(self) -> str:
return self._session_token

def refresh(self) -> str:
if self._refresh_token:
log.info("refreshing session token using refresh token")
self._session_token = self._cognito.refresh_token(self._refresh_token, self._session_token)
else:
log.info("no refresh token, re-authenticating with API key/secret")
self._session_token, self._refresh_token = self._cognito.authenticate(
self._api_key, self._api_secret
)
return self._session_token
17 changes: 5 additions & 12 deletions processor/clients/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,17 @@
log = logging.getLogger()


# encapsulates a shared API session and re-authentication functionality
# encapsulates a shared API session and token refresh
class SessionManager:
def __init__(self, authentication_client, api_key, api_secret):
self.authentication_client = authentication_client
self.api_key = api_key
self.api_secret = api_secret

self.__session_token = None
def __init__(self, auth_provider):
self._auth_provider = auth_provider

@property
def session_token(self):
if self.__session_token is None:
self.refresh_session()

return self.__session_token
return self._auth_provider.get_session_token()

def refresh_session(self):
self.__session_token = self.authentication_client.authenticate(self.api_key, self.api_secret)
self._auth_provider.refresh()


class BaseClient:
Expand Down
2 changes: 1 addition & 1 deletion processor/clients/workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, api_host, session_manager):
# with an empty body even when a workflow instance does not exist
@BaseClient.retry_with_refresh
def get_workflow_instance(self, workflow_instance_id):
url = f"{self.api_host}/compute/workflows/instances/{workflow_instance_id}"
url = f"{self.api_host}/compute/workflows/runs/{workflow_instance_id}"

headers = {"Accept": "application/json", "Authorization": f"Bearer {self.session_manager.session_token}"}

Expand Down
5 changes: 5 additions & 0 deletions processor/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import os
import uuid

log = logging.getLogger()


class Config:
def __init__(self):
Expand All @@ -15,6 +18,8 @@ def __init__(self):
# has been converted to use a different variable to represent the workflow instance ID
self.WORKFLOW_INSTANCE_ID = os.getenv("INTEGRATION_ID", str(uuid.uuid4()))

self.SESSION_TOKEN = os.getenv("SESSION_TOKEN")
self.REFRESH_TOKEN = os.getenv("REFRESH_TOKEN")
self.API_KEY = os.getenv("PENNSIEVE_API_KEY")
self.API_SECRET = os.getenv("PENNSIEVE_API_SECRET")
self.API_HOST = os.getenv("PENNSIEVE_API_HOST", "https://api.pennsieve.net")
Expand Down
10 changes: 3 additions & 7 deletions processor/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@
import backoff
import requests
from clients import (
AuthenticationClient,
ImportClient,
ImportFile,
PackagesClient,
SessionManager,
TimeSeriesClient,
WorkflowClient,
)
Expand All @@ -34,7 +32,9 @@
"""


def import_timeseries(api_host, api2_host, api_key, api_secret, workflow_instance_id, file_directory):
def import_timeseries(
api_host, api2_host, session_manager, workflow_instance_id, file_directory
):
# gather all the time series files from the output directory
timeseries_data_files = []
timeseries_channel_files = []
Expand All @@ -50,10 +50,6 @@ def import_timeseries(api_host, api2_host, api_key, api_secret, workflow_instanc
log.info("no time series channels or data")
return None

# authentication against the Pennsieve API
authorization_client = AuthenticationClient(api_host)
session_manager = SessionManager(authorization_client, api_key, api_secret)

# fetch workflow instance for parameters (dataset_id, package_id, etc.)
workflow_client = WorkflowClient(api2_host, session_manager)
workflow_instance = workflow_client.get_workflow_instance(workflow_instance_id)
Expand Down
17 changes: 14 additions & 3 deletions processor/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,22 @@
# note: this will be moved to a separated post-processor once the analysis pipeline is more
# easily able to handle > 3 processors
if config.IMPORTER_ENABLED:
importer = import_timeseries(
from clients.authentication_client import KeySecretAuthProvider, TokenAuthProvider
from clients.base_client import SessionManager

if config.SESSION_TOKEN:
auth_provider = TokenAuthProvider(config.API_HOST, config.SESSION_TOKEN, config.REFRESH_TOKEN)
elif config.API_KEY and config.API_SECRET:
auth_provider = KeySecretAuthProvider(config.API_HOST, config.API_KEY, config.API_SECRET)
else:
raise RuntimeError("no authentication credentials provided: set SESSION_TOKEN or API_KEY/API_SECRET")

session_manager = SessionManager(auth_provider)

import_timeseries(
config.API_HOST,
config.API_HOST2,
config.API_KEY,
config.API_SECRET,
session_manager,
config.WORKFLOW_INSTANCE_ID,
config.OUTPUT_DIR,
)
8 changes: 0 additions & 8 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,6 @@ def mock_session_manager():
return manager


@pytest.fixture
def mock_authentication_client():
"""Mock authentication client."""
client = Mock()
client.authenticate = Mock(return_value="mock-access-token")
return client


@pytest.fixture
def sample_timestamps():
"""Sample evenly-spaced timestamps at 1000 Hz."""
Expand Down
Loading