Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,36 @@ Producers and consumers of streaming IMAS data.
This project is in active development and may introduce breaking changes at any
moment.

## Installation

IMAS-Streams is not yet available on PyPI, but you can install it directly from
this git repository:

```bash
pip install 'imas-streams @ git+https://github.com/iterorganization/IMAS-Streams.git'
# If you wish to use the kafka features:
pip install 'imas-streams[kafka] @ git+https://github.com/iterorganization/IMAS-Streams.git'
```

## Command line interface

IMAS-Streams comes with a command line interface: the program `imas-streams`
should be available after installation. Note that some commands may require the
optional `kafka` dependency, see [Installation](#Installation).

```bash
imas-streams --version # display version of installed imas-streams library
imas-streams --help # display help message and list available commands
```

## Application programming interface (API)

API documentation is not yet available. The code is self-documented with
docstrings. All classes directly available in the
[`imas_streams`](src/imas_streams/__init__.py) and
[`imas_streams.kafka`](src/imas_streams/kafka.py) modules are considered public
API.

## Design goal and use cases

The goal of this project is to define a way to efficiently stream
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ where = ["src"]
include = ["imas_streams*"]

[project.scripts]
imas-streams = "imas_streams.cli:main"

[tool.setuptools_scm]
local_scheme = "no-local-version"
Expand Down
5 changes: 5 additions & 0 deletions src/imas_streams/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Allows executing pon2imas as a module with `python -m imas_streams`

from imas_streams.cli import main

main()
64 changes: 64 additions & 0 deletions src/imas_streams/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import logging

import click
import imas

from imas_streams import BatchedIDSConsumer


@click.group(invoke_without_command=True, no_args_is_help=True)
@click.version_option()
def main() -> None:
"""Command line utilities for streaming IMAS data."""
# Disable IMAS-Python log handler (prevent double output for imas log messages)
imas_logger = logging.getLogger("imas")
for handler in imas_logger.handlers:
imas_logger.removeHandler(handler)
# Set up our own basic log hander, writing messages to sys.stderr
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
)


@main.command()
@click.argument("kafka_host")
@click.argument("kafka_topic")
@click.argument("imas_uri")
@click.option(
"--batch-size", default=100, help="Number of time slice to batch per put_slice."
)
@click.option(
"--overwrite", is_flag=True, help="Overwrite any existing IMAS Data Entry."
)
def kafka_to_imasentry(
kafka_host: str, kafka_topic: str, imas_uri: str, batch_size: int, overwrite: bool
):
"""Consume streaming IMAS data from Kafka and store data in an IMAS Data Entry.

N.B. This program requires the optional kafka dependency.

\b
Arguments:
KAFKA_HOST Kafka host and port (aka bootstrap.servers). E.g. 'localhost:9092'
KAFKA_TOPIC Name of the kafka topic with streaming IMAS data.
IMAS_URI IMAS URI to store the data at, for example 'imas:hdf5?path=./out'.
The program will not overwrite existing data (unless the --overwrite
flag is given). Only backends that implement 'put_slice' are
supported, such as HDF5 and MDSPLUS.
"""
# Local import: kafka is an optional dependency
from imas_streams.kafka import KafkaConsumer, KafkaSettings

consumer = KafkaConsumer(
KafkaSettings(host=kafka_host, topic_name=kafka_topic),
BatchedIDSConsumer,
batch_size=batch_size,
return_copy=False,
)

mode = "w" if overwrite else "x"
with imas.DBEntry(imas_uri, mode) as entry:
for result in consumer.stream():
if result is not None:
entry.put_slice(result)
46 changes: 46 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from unittest.mock import patch

import imas
from click.testing import CliRunner
from imas.ids_defs import IDS_TIME_MODE_HOMOGENEOUS
from imas.util import idsdiffgen

from imas_streams import BatchedIDSConsumer
from imas_streams.cli import main
from imas_streams.kafka import KafkaSettings


def test_version():
runner = CliRunner()
result = runner.invoke(main, ["--version"])
assert result.exit_code == 0


@patch("imas_streams.kafka.KafkaConsumer")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to also mock the ids put_slice part?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed: mocking the put_slice doesn't improve the readability of the test, so we'll keep it as is.

def test_kafka_to_imasentry(mock_kafkaconsumer, tmp_path):
# Make some testdata to store
ids = imas.IDSFactory().core_profiles()
ids.ids_properties.homogeneous_time = IDS_TIME_MODE_HOMOGENEOUS
ids.time = [0.0, 0.1, 0.2]
ids.global_quantities.ip = [0.2, 0.3, 0.4]
mock_kafkaconsumer.return_value.stream.return_value = [None, None, ids]

# Run CLI
runner = CliRunner()
uri = f"imas:hdf5?path={tmp_path}"
result = runner.invoke(
main,
["kafka-to-imasentry", "kafka_host:port", "topic_name", uri, "--batch-size=10"],
)

assert result.exit_code == 0
mock_kafkaconsumer.assert_called_once_with(
KafkaSettings(host="kafka_host:port", topic_name="topic_name"),
BatchedIDSConsumer,
batch_size=10,
return_copy=False,
)
# Check that the IDS was stored correctly
with imas.DBEntry(uri, "r") as entry:
ids2 = entry.get("core_profiles")
assert list(idsdiffgen(ids, ids2)) == []
3 changes: 2 additions & 1 deletion tests/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import imas
import pytest
from imas.ids_defs import IDS_TIME_MODE_HOMOGENEOUS

pytest.importorskip("confluent_kafka")

Expand All @@ -26,7 +27,7 @@ def kafka_host():
@pytest.fixture
def test_magnetics():
ids = imas.IDSFactory().magnetics()
ids.ids_properties.homogeneous_time = imas.ids_defs.IDS_TIME_MODE_HOMOGENEOUS
ids.ids_properties.homogeneous_time = IDS_TIME_MODE_HOMOGENEOUS
ids.time = [1.0]
ids.flux_loop.resize(1)
ids.flux_loop[0].name = "test"
Expand Down