diff --git a/src/imas_streams/cli.py b/src/imas_streams/cli.py index 09a152c..ee34ea1 100644 --- a/src/imas_streams/cli.py +++ b/src/imas_streams/cli.py @@ -31,8 +31,14 @@ def main() -> None: @click.option( "--overwrite", is_flag=True, help="Overwrite any existing IMAS Data Entry." ) +@click.option("--timeout", "-t", default=5.0, help="Timeout for receiving next message") def kafka_to_imasentry( - kafka_host: str, kafka_topic: str, imas_uri: str, batch_size: int, overwrite: bool + kafka_host: str, + kafka_topic: str, + imas_uri: str, + batch_size: int, + overwrite: bool, + timeout: float, ): """Consume streaming IMAS data from Kafka and store data in an IMAS Data Entry. @@ -59,6 +65,6 @@ def kafka_to_imasentry( mode = "w" if overwrite else "x" with imas.DBEntry(imas_uri, mode) as entry: - for result in consumer.stream(): + for result in consumer.stream(timeout=timeout): if result is not None: entry.put_slice(result)