Skip to content
Merged
4 changes: 4 additions & 0 deletions python/lib/sift_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,18 @@ async def main():
5. **Use type hints** to get full IDE support and catch errors early
"""

from __future__ import annotations

import logging

from sift_client.client import SiftClient
from sift_client.config import config
from sift_client.transport import SiftConnectionConfig

__all__ = [
"SiftClient",
"SiftConnectionConfig",
"config",
]

logging.getLogger(__name__).addHandler(logging.NullHandler())
1 change: 1 addition & 0 deletions python/lib/sift_client/_internal/sync_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def generate_sync_api(cls: type[ResourceBase], sync_name: str) -> type:
@wraps(orig_init)
def __init__(self, *args, **kwargs): # noqa: N807
self._async_impl = cls(*args, **kwargs)
self._async_impl._is_sync = True
Comment thread
alexluck-sift marked this conversation as resolved.

def _run(self, coro):
loop = self._async_impl.client.get_asyncio_loop()
Expand Down
31 changes: 26 additions & 5 deletions python/lib/sift_client/_internal/util/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import zipfile
from typing import TYPE_CHECKING

from alive_progress import alive_bar # type: ignore[import-untyped]

from sift_client.errors import SiftWarning

if TYPE_CHECKING:
Expand All @@ -12,13 +14,21 @@
from sift_client.transport.rest_transport import RestClient


def download_file(signed_url: str, output_path: Path, *, rest_client: RestClient) -> Path:
def download_file(
signed_url: str,
output_path: Path,
*,
rest_client: RestClient,
show_progress: bool = False,
) -> Path:
"""Download a file from a URL in streaming 4 MiB chunks.

Args:
url: The URL to download from.
dest: Path where the file will be saved. Parent directories are created if needed.
rest_client: The SDK rest client to use for the download.
show_progress: If True, display a progress bar during download.
Defaults to False.

Returns:
The path to the downloaded file.
Expand All @@ -30,10 +40,21 @@ def download_file(signed_url: str, output_path: Path, *, rest_client: RestClient
# Strip the session's default Authorization header, presigned URLs carry their own auth
with rest_client.get(signed_url, stream=True, headers={"Authorization": None}) as response:
response.raise_for_status()
with output_path.open("wb") as file:
for chunk in response.iter_content(chunk_size=4194304): # 4 MiB
if chunk:
file.write(chunk)
total_bytes = int(response.headers.get("Content-Length", 0)) or None
with alive_bar(
total_bytes,
title="Downloading",
spinner="dots_waves",
spinner_length=7,
unit="B",
scale="SI",
disable=not show_progress,
) as bar:
with output_path.open("wb") as file:
for chunk in response.iter_content(chunk_size=4194304): # 4 MiB
if chunk:
file.write(chunk)
bar(len(chunk))
return output_path


Expand Down
71 changes: 71 additions & 0 deletions python/lib/sift_client/_tests/resources/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Error handling and edge cases
"""

import asyncio
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, MagicMock, patch

Expand Down Expand Up @@ -393,6 +394,24 @@ async def test_raises_timeout_error_when_not_complete_in_time(self, jobs_api_asy
timeout_secs=0.1,
)

@pytest.mark.asyncio
async def test_concurrent_wait_with_progress_disabled(self, jobs_api_async):
"""Concurrent wait_until_complete calls with show_progress=False should not raise."""
mock_job = MagicMock()
mock_job.job_status = JobStatus.FINISHED

with patch(
"sift_client.resources.jobs.JobsAPIAsync.get",
new_callable=AsyncMock,
return_value=mock_job,
):
results = await asyncio.gather(
jobs_api_async.wait_until_complete(job="job-1", show_progress=False),
jobs_api_async.wait_until_complete(job="job-2", show_progress=False),
)

assert all(r.job_status == JobStatus.FINISHED for r in results)

class TestJobProperties:
"""Tests for job property methods."""

Expand Down Expand Up @@ -527,6 +546,58 @@ def test_basic_list(self, jobs_api_sync):
if jobs:
assert isinstance(jobs[0], Job)

class TestWaitUntilComplete:
"""Tests for wait_until_complete through the sync wrapper."""

def test_wait_defaults_to_progress_enabled(self, jobs_api_sync):
"""Sync wrapper defaults to show_progress=True when no kwarg is passed."""
mock_job = MagicMock()
mock_job.job_status = JobStatus.FINISHED

with patch(
"sift_client.resources.jobs.JobsAPIAsync.get",
new_callable=AsyncMock,
return_value=mock_job,
):
result = jobs_api_sync.wait_until_complete(job="job-1")

assert result.job_status == JobStatus.FINISHED

def test_wait_with_progress_explicit_false(self, jobs_api_sync):
"""Explicit show_progress=False overrides the sync default."""
mock_job = MagicMock()
mock_job.job_status = JobStatus.FINISHED

with patch(
"sift_client.resources.jobs.JobsAPIAsync.get",
new_callable=AsyncMock,
return_value=mock_job,
):
result = jobs_api_sync.wait_until_complete(job="job-1", show_progress=False)

assert result.job_status == JobStatus.FINISHED

def test_namespace_override_disables_progress(self, jobs_api_sync):
"""Setting sift_client.config.show_progress=False overrides the sync default."""
import sift_client

mock_job = MagicMock()
mock_job.job_status = JobStatus.FINISHED

original = sift_client.config.show_progress
try:
sift_client.config.show_progress = False
with patch(
"sift_client.resources.jobs.JobsAPIAsync.get",
new_callable=AsyncMock,
return_value=mock_job,
):
result = jobs_api_sync.wait_until_complete(job="job-1")
finally:
sift_client.config.show_progress = original

assert result.job_status == JobStatus.FINISHED


class TestWaitAndDownload:
@pytest.mark.asyncio
Expand Down
37 changes: 37 additions & 0 deletions python/lib/sift_client/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Global configuration for the Sift client library."""

from __future__ import annotations

from dataclasses import dataclass, fields


@dataclass
class Config:
"""Global configuration for the Sift client library.

This is a singleton dataclass, use the module-level ``config`` instance
rather than creating your own::

import sift_client

sift_client.config.show_progress = False

Setting an attribute that doesn't exist raises ``AttributeError`` so
typos are caught immediately.

"""

show_progress: bool | None = None
"""Controls progress-bar display for job polling and file downloads.

``None`` (default) shows bars for sync calls and hides them for async.
Set to ``False`` to disable everywhere.
"""

def __setattr__(self, name: str, value: object) -> None:
if name not in {f.name for f in fields(self)}:
raise AttributeError(f"Unknown setting: {name!r}")
super().__setattr__(name, value)


config = Config()
61 changes: 53 additions & 8 deletions python/lib/sift_client/resources/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from pathlib import Path
from typing import TYPE_CHECKING

from alive_progress import alive_bar # type: ignore[import-untyped]

import sift_client as _sift_client_module
from sift_client._internal.low_level_wrappers.jobs import JobsLowLevelClient
from sift_client._internal.util.executor import run_sync_function
from sift_client._internal.util.file import download_file, extract_zip
Expand Down Expand Up @@ -169,6 +172,7 @@ async def wait_until_complete(
*,
polling_interval_secs: int = 5,
timeout_secs: int | None = None,
show_progress: bool | None = None,
) -> Job:
"""Wait until the job is complete or the timeout is reached.

Expand All @@ -180,20 +184,45 @@ async def wait_until_complete(
polling_interval_secs: Seconds between status polls. Defaults to 5s.
timeout_secs: Maximum seconds to wait. If None, polls indefinitely.
Defaults to None (indefinite).
show_progress: If True, display an animated progress spinner alongside
the job status while polling. Defaults to True for sync, False
for async. Use ``sift_client.config.show_progress = False`` to disable
globally for sync.

Returns:
The Job in the completed state.
"""
job_id = job._id_or_error if isinstance(job, Job) else job
if show_progress is None:
global_setting = _sift_client_module.config.show_progress
if global_setting is not None:
show_progress = global_setting
elif getattr(self, "_is_sync", False):
show_progress = True
else:
show_progress = False

start = time.monotonic()
while True:
job = await self.get(job_id)
if job.job_status in (JobStatus.FINISHED, JobStatus.FAILED, JobStatus.CANCELLED):
return job
if timeout_secs is not None and (time.monotonic() - start) >= timeout_secs:
raise TimeoutError(f"Job {job_id} did not complete within {timeout_secs} seconds")
await asyncio.sleep(polling_interval_secs)
with alive_bar(
title=f"Job {job_id}: polling",
bar=None,
spinner_length=7,
spinner="dots_waves",
monitor=False,
stats=False,
disable=not show_progress,
) as bar:
while True:
job = await self.get(job_id)
bar.title(f"Job {job_id} ({job.job_type.value.lower()}): {job.job_status.value}")
bar()
if job.job_status in (JobStatus.FINISHED, JobStatus.FAILED, JobStatus.CANCELLED):
return job
if timeout_secs is not None and (time.monotonic() - start) >= timeout_secs:
raise TimeoutError(
f"Job {job_id} did not complete within {timeout_secs} seconds"
)
await asyncio.sleep(polling_interval_secs)

async def wait_and_download(
self,
Expand All @@ -203,6 +232,7 @@ async def wait_and_download(
timeout_secs: int | None = None,
output_dir: str | Path | None = None,
extract: bool = True,
show_progress: bool | None = None,
) -> list[Path]:
"""Wait for a job to complete and download the result files.

Expand All @@ -219,6 +249,10 @@ async def wait_and_download(
extract it and delete the archive, returning paths to the
extracted files. Non-zip files are returned as-is regardless
of this flag.
show_progress: If True, display an animated progress spinner
while waiting and a download progress bar. Defaults to True
for sync, False for async. Use ``sift_client.config.show_progress = False``
to disable globally for sync.

Returns:
List of paths to the downloaded/extracted files.
Expand All @@ -228,11 +262,20 @@ async def wait_and_download(
TimeoutError: If the job does not complete within timeout_secs.
"""
job_id = job._id_or_error if isinstance(job, Job) else job
if show_progress is None:
global_setting = _sift_client_module.config.show_progress
if global_setting is not None:
show_progress = global_setting
elif getattr(self, "_is_sync", False):
show_progress = True
else:
show_progress = False

completed_job = await self.wait_until_complete(
job=job_id,
polling_interval_secs=polling_interval_secs,
timeout_secs=timeout_secs,
show_progress=show_progress,
)
if completed_job.job_status == JobStatus.FAILED:
if (
Expand All @@ -259,7 +302,9 @@ async def wait_and_download(
# Run the synchronous download in a thread pool to avoid blocking the event loop
rest_client = self.client.rest_client
await run_sync_function(
lambda: download_file(presigned_url, download_path, rest_client=rest_client)
lambda: download_file(
presigned_url, download_path, rest_client=rest_client, show_progress=show_progress
)
)

if not extract or not zipfile.is_zipfile(download_path):
Expand Down
16 changes: 15 additions & 1 deletion python/lib/sift_client/resources/sync_stubs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ class JobsAPI:
timeout_secs: int | None = None,
output_dir: str | Path | None = None,
extract: bool = True,
show_progress: bool | None = None,
) -> list[Path]:
"""Wait for a job to complete and download the result files.

Expand All @@ -875,6 +876,10 @@ class JobsAPI:
extract it and delete the archive, returning paths to the
extracted files. Non-zip files are returned as-is regardless
of this flag.
show_progress: If True, display an animated progress spinner
while waiting and a download progress bar. Defaults to True
for sync, False for async. Use ``sift_client.config.show_progress = False``
to disable globally for sync.

Returns:
List of paths to the downloaded/extracted files.
Expand All @@ -886,7 +891,12 @@ class JobsAPI:
...

def wait_until_complete(
self, job: Job | str, *, polling_interval_secs: int = 5, timeout_secs: int | None = None
self,
job: Job | str,
*,
polling_interval_secs: int = 5,
timeout_secs: int | None = None,
show_progress: bool | None = None,
) -> Job:
"""Wait until the job is complete or the timeout is reached.

Expand All @@ -898,6 +908,10 @@ class JobsAPI:
polling_interval_secs: Seconds between status polls. Defaults to 5s.
timeout_secs: Maximum seconds to wait. If None, polls indefinitely.
Defaults to None (indefinite).
show_progress: If True, display an animated progress spinner alongside
the job status while polling. Defaults to True for sync, False
for async. Use ``sift_client.config.show_progress = False`` to disable
globally for sync.

Returns:
The Job in the completed state.
Expand Down
2 changes: 1 addition & 1 deletion python/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ plugins:
show_source: false
find_stubs_package: true
show_if_no_docstring: true
filters: "public"
filters: ["!^__(?!init)", "!^_[^_]"]
show_submodules: false
# Styling
group_by_category: true
Expand Down
Loading