From 5f2b5406f5823b66d3e3d1f109d0e55864bb3dfc Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Mon, 16 Feb 2026 11:43:19 +0100 Subject: [PATCH 1/2] Add command line interface --- README.md | 30 +++++++++++++++++ pyproject.toml | 1 + src/imas_streams/__main__.py | 5 +++ src/imas_streams/cli.py | 64 ++++++++++++++++++++++++++++++++++++ 4 files changed, 100 insertions(+) create mode 100644 src/imas_streams/__main__.py create mode 100644 src/imas_streams/cli.py diff --git a/README.md b/README.md index 2888cf5..9d5770d 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,36 @@ Producers and consumers of streaming IMAS data. This project is in active development and may introduce breaking changes at any moment. +## Installation + +IMAS-Streams is not yet available on PyPI, but you can install it directly from +this git repository: + +```bash +pip install 'imas-streams @ git+https://github.com/iterorganization/IMAS-Streams.git' +# If you wish to use the kafka features: +pip install 'imas-streams[kafka] @ git+https://github.com/iterorganization/IMAS-Streams.git' +``` + +## Command line interface + +IMAS-Streams comes with a command line interface: the program `imas-streams` +should be available after installation. Note that some commands may require the +optional `kafka` dependency, see [Installation](#Installation). + +```bash +imas-streams --version # display version of installed imas-streams library +imas-streams --help # display help message and list available commands +``` + +## Application programming interface (API) + +API documentation is not yet available. The code is self-documented with +docstrings. All classes directly available in the +[`imas_streams`](src/imas_streams/__init__.py) and +[`imas_streams.kafka`](src/imas_streams/kafka.py) modules are considered public +API. + ## Design goal and use cases The goal of this project is to define a way to efficiently stream diff --git a/pyproject.toml b/pyproject.toml index 250caaa..2b64f05 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ where = ["src"] include = ["imas_streams*"] [project.scripts] +imas-streams = "imas_streams.cli:main" [tool.setuptools_scm] local_scheme = "no-local-version" diff --git a/src/imas_streams/__main__.py b/src/imas_streams/__main__.py new file mode 100644 index 0000000..fa19107 --- /dev/null +++ b/src/imas_streams/__main__.py @@ -0,0 +1,5 @@ +# Allows executing pon2imas as a module with `python -m imas_streams` + +from imas_streams.cli import main + +main() diff --git a/src/imas_streams/cli.py b/src/imas_streams/cli.py new file mode 100644 index 0000000..09a152c --- /dev/null +++ b/src/imas_streams/cli.py @@ -0,0 +1,64 @@ +import logging + +import click +import imas + +from imas_streams import BatchedIDSConsumer + + +@click.group(invoke_without_command=True, no_args_is_help=True) +@click.version_option() +def main() -> None: + """Command line utilities for streaming IMAS data.""" + # Disable IMAS-Python log handler (prevent double output for imas log messages) + imas_logger = logging.getLogger("imas") + for handler in imas_logger.handlers: + imas_logger.removeHandler(handler) + # Set up our own basic log hander, writing messages to sys.stderr + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s [%(name)s] %(message)s", + ) + + +@main.command() +@click.argument("kafka_host") +@click.argument("kafka_topic") +@click.argument("imas_uri") +@click.option( + "--batch-size", default=100, help="Number of time slice to batch per put_slice." +) +@click.option( + "--overwrite", is_flag=True, help="Overwrite any existing IMAS Data Entry." +) +def kafka_to_imasentry( + kafka_host: str, kafka_topic: str, imas_uri: str, batch_size: int, overwrite: bool +): + """Consume streaming IMAS data from Kafka and store data in an IMAS Data Entry. + + N.B. This program requires the optional kafka dependency. + + \b + Arguments: + KAFKA_HOST Kafka host and port (aka bootstrap.servers). E.g. 'localhost:9092' + KAFKA_TOPIC Name of the kafka topic with streaming IMAS data. + IMAS_URI IMAS URI to store the data at, for example 'imas:hdf5?path=./out'. + The program will not overwrite existing data (unless the --overwrite + flag is given). Only backends that implement 'put_slice' are + supported, such as HDF5 and MDSPLUS. + """ + # Local import: kafka is an optional dependency + from imas_streams.kafka import KafkaConsumer, KafkaSettings + + consumer = KafkaConsumer( + KafkaSettings(host=kafka_host, topic_name=kafka_topic), + BatchedIDSConsumer, + batch_size=batch_size, + return_copy=False, + ) + + mode = "w" if overwrite else "x" + with imas.DBEntry(imas_uri, mode) as entry: + for result in consumer.stream(): + if result is not None: + entry.put_slice(result) From 5239a9245a15923231cd2d69f6b7e97688533095 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Mon, 16 Feb 2026 12:01:41 +0100 Subject: [PATCH 2/2] Add CLI tests --- tests/test_cli.py | 46 +++++++++++++++++++++++++++++++++++++++++++++ tests/test_kafka.py | 3 ++- 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 tests/test_cli.py diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..cec703e --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,46 @@ +from unittest.mock import patch + +import imas +from click.testing import CliRunner +from imas.ids_defs import IDS_TIME_MODE_HOMOGENEOUS +from imas.util import idsdiffgen + +from imas_streams import BatchedIDSConsumer +from imas_streams.cli import main +from imas_streams.kafka import KafkaSettings + + +def test_version(): + runner = CliRunner() + result = runner.invoke(main, ["--version"]) + assert result.exit_code == 0 + + +@patch("imas_streams.kafka.KafkaConsumer") +def test_kafka_to_imasentry(mock_kafkaconsumer, tmp_path): + # Make some testdata to store + ids = imas.IDSFactory().core_profiles() + ids.ids_properties.homogeneous_time = IDS_TIME_MODE_HOMOGENEOUS + ids.time = [0.0, 0.1, 0.2] + ids.global_quantities.ip = [0.2, 0.3, 0.4] + mock_kafkaconsumer.return_value.stream.return_value = [None, None, ids] + + # Run CLI + runner = CliRunner() + uri = f"imas:hdf5?path={tmp_path}" + result = runner.invoke( + main, + ["kafka-to-imasentry", "kafka_host:port", "topic_name", uri, "--batch-size=10"], + ) + + assert result.exit_code == 0 + mock_kafkaconsumer.assert_called_once_with( + KafkaSettings(host="kafka_host:port", topic_name="topic_name"), + BatchedIDSConsumer, + batch_size=10, + return_copy=False, + ) + # Check that the IDS was stored correctly + with imas.DBEntry(uri, "r") as entry: + ids2 = entry.get("core_profiles") + assert list(idsdiffgen(ids, ids2)) == [] diff --git a/tests/test_kafka.py b/tests/test_kafka.py index 3c8e8da..37cc7c7 100644 --- a/tests/test_kafka.py +++ b/tests/test_kafka.py @@ -4,6 +4,7 @@ import imas import pytest +from imas.ids_defs import IDS_TIME_MODE_HOMOGENEOUS pytest.importorskip("confluent_kafka") @@ -26,7 +27,7 @@ def kafka_host(): @pytest.fixture def test_magnetics(): ids = imas.IDSFactory().magnetics() - ids.ids_properties.homogeneous_time = imas.ids_defs.IDS_TIME_MODE_HOMOGENEOUS + ids.ids_properties.homogeneous_time = IDS_TIME_MODE_HOMOGENEOUS ids.time = [1.0] ids.flux_loop.resize(1) ids.flux_loop[0].name = "test"