diff --git a/argos/emulator/__init__.py b/argos/emulator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/argos/emulator/devices/base.py b/argos/emulator/devices/base.py new file mode 100644 index 0000000..e69de29 diff --git a/argos/emulator/devices/sonic.py b/argos/emulator/devices/sonic.py new file mode 100644 index 0000000..90da14a --- /dev/null +++ b/argos/emulator/devices/sonic.py @@ -0,0 +1,106 @@ +import logging +from numpy import random +import pandas +import os + +print("cwd:", os.getcwd()) +from argos.utils.logging import get_logger as argos_get_logger + +class demoDevice: + + + def __init__(self,deviceName,frequency,duration): + """ + Initializes a demo device with the fields. + + Parameters + ---------- + deviceConfiguration : dict + A configuration of the device: + + + """ + self.logger = argos_get_logger(self) + self.deviceName = deviceName + self.frequency = frequency + self.duration = duration + + @property + def delay_s(self): + return pandas.Timedelta('%.3fs' % (1/float(self.frequency))).value/1e9 + + def stream_messages(self): + import time + import pandas + from numpy import random + + u = v = w = T = 0 + + startDate = pandas.Timestamp.now() + endTime = startDate + pandas.to_timedelta(self.duration) + + timeDelta = pandas.Timedelta('%.3fs' % (1 / float(self.frequency))) + + self.logger.execution( + f"Streaming from {startDate} to {endTime} at interval {timeDelta}" + ) + + ts = startDate + + while ts <= endTime: + u = self.u_min + (self.u_max - self.u_min) * random.rand() + v = self.v_min + (self.v_max - self.v_min) * random.rand() + w = self.w_min + (self.w_max - self.w_min) * random.rand() + T = self.T_min + (self.T_max - self.T_min) * random.rand() + + yield { + "datetime": ts, + "u": u, + "v": v, + "w": w, + "t": T + } + + ts += timeDelta + time.sleep(timeDelta.total_seconds()) + def emmitMessage(self): + pass + +class deviceSonic(demoDevice): + + u_max = 6 + u_min = -6 + v_max = 6 + v_min = -6 + w_max = 0.3 + w_min = -0.3 + T_max = 40 + T_min = 20 + + def __init__(self,deviceName,frequency,duration): + super().__init__(deviceName,frequency,duration) + + + def createMessages(self): + u = 0 + v = 0 + w = 0 + T = 0 + self.logger.execution("------------ Start -----------------") + self.logger.debug(f"Frequency {self.frequency} duration {self.duration}") + startDate = pandas.Timestamp.now() + endTime = startDate + pandas.to_timedelta(self.duration) + timeDelta = pandas.Timedelta('%.3fs' % (1/float(self.frequency))) + + self.logger.execution(f"From {startDate} to {endTime} at frequency {timeDelta}") + + dateRange = pandas.date_range(startDate,endTime,freq=timeDelta) + self.logger.execution(f"Creating {len(dateRange)} messages") + for ts in dateRange: + u = self.u_min + (self.u_max - self.u_min) * random.rand() + v = self.v_min + (self.v_max - self.v_min) * random.rand() + w = self.w_min + (self.w_max - self.w_min) * random.rand() + T = self.T_min + (self.T_max - self.T_min) * random.rand() + self.messages.append(dict(datetime=ts.value,u=u,v=v,w=w,t=T)) + + self.logger.execution("------------ End -----------------") diff --git a/argos/emulator/run_devices.py b/argos/emulator/run_devices.py new file mode 100644 index 0000000..96b5333 --- /dev/null +++ b/argos/emulator/run_devices.py @@ -0,0 +1,95 @@ +#! /usr/bin/env python +import argparse +import json +import logging +from devices.sonic import deviceSonic +from transports.my_kafka import KafkaTransport +from transports.tcp import TCPTransport + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger("emulator.run_device") + + parser = argparse.ArgumentParser() + + parser.add_argument( + "--deviceName", + required=True, + help="Device name" + ) + + parser.add_argument( + "--frequency", + required=True, + type=float, + help="Frequency in Hz" + ) + + parser.add_argument( + "--duration", + required=True, + help="Duration string (example: 10s, 5m, 1h)" + ) + + parser.add_argument( + "--topic", + required=True, + help="Kafka topic" + ) + + parser.add_argument( + "--bootstrapServers", + default="127.0.0.1:9092", + help="Kafka bootstrap servers" + ) + + parser.add_argument( + "--host", + default="127.0.0.1", + help="TCP server host" + ) + + parser.add_argument( + "--port", + type=int, + default=9000, + help="TCP server port" + ) + + args = parser.parse_args() + + logger.info("Starting emulator") + + device = deviceSonic( + deviceName=args.deviceName, + frequency=args.frequency, + duration=args.duration + ) + + + + transport = KafkaTransport() + + try: + + for msg in device.stream_messages(): + + sndmsg = json.dumps(msg, default=str) + + logger.info( + f"[{args.deviceName}] Sending message: {sndmsg}" + ) + + transport.send( + msg=sndmsg + ) + + except KeyboardInterrupt: + logger.info("Stopping emulator") + + finally: + transport.close() + logger.info("Transport closed") + +""" +""" \ No newline at end of file diff --git a/argos/emulator/run_devices_to_csv.py b/argos/emulator/run_devices_to_csv.py new file mode 100644 index 0000000..1dc930e --- /dev/null +++ b/argos/emulator/run_devices_to_csv.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python +import argparse +import json +import logging + +from devices.sonic import deviceSonic + +import csv + +class CsvTransport: + + def __init__(self, filename="sonic_data.csv"): + self.file = open(filename, "a", newline="") + self.writer = None + + def send(self, topic: str, msg: str): + data = json.loads(msg) + + if self.writer is None: + self.writer = csv.DictWriter( + self.file, + fieldnames=data.keys() + ) + self.writer.writeheader() + + self.writer.writerow(data) + self.file.flush() + + def close(self): + self.file.close() + +""" +class ConsoleTransport: + + Simple transport that prints messages instead of sending them anywhere. + + + def send(self, topic: str, msg: str): + print(f"[TOPIC: {topic}] {msg}") + + def close(self): + print("ConsoleTransport closed") +""" + +def parse_args(): + parser = argparse.ArgumentParser() + + parser.add_argument( + "--deviceName", + default="test_device", + help="Device name (default: test_device)" + ) + + parser.add_argument( + "--frequency", + type=float, + default=1.0, + help="Frequency in Hz (default: 1.0)" + ) + + parser.add_argument( + "--duration", + default="10s", + help="Duration string (default: 10s)" + ) + + parser.add_argument( + "--topic", + default="test-topic", + help="Logical topic name (default: test-topic)" + ) + + return parser.parse_args() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger("emulator.run_device") + + args = parse_args() + + logger.info("Starting emulator (LOCAL MODE - no Kafka)") + + device = deviceSonic( + deviceName=args.deviceName, + frequency=args.frequency, + duration=args.duration + ) + + transport = CsvTransport("sonic_data.csv") + try: + for msg in device.stream_messages(): + + sndmsg = json.dumps(msg, default=str) + + logger.info( + f"[{args.deviceName}] Sending message: {sndmsg}" + ) + + transport.send( + topic=args.topic, + msg=sndmsg + ) + + except KeyboardInterrupt: + logger.info("Stopping emulator") + + finally: + transport.close() + logger.info("Transport closed") + + diff --git a/argos/emulator/transports/my_kafka.py b/argos/emulator/transports/my_kafka.py new file mode 100644 index 0000000..126ab9a --- /dev/null +++ b/argos/emulator/transports/my_kafka.py @@ -0,0 +1,47 @@ +import json +import time +import logging +from kafka import KafkaProducer +import sys +import os + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) + +from ..devices.sonic import deviceSonic + + +class KafkaTransport: + def __init__(self, bootstrap_servers="127.0.0.1:9092"): + self.producer = KafkaProducer( + bootstrap_servers=bootstrap_servers, + value_serializer=lambda v: json.dumps(v).encode("utf-8") + ) + self.logger = logging.getLogger("emulator.transport.kafka") + + def send(self, topic, msg: dict): + self.logger.info(f"Sending message: {msg}") + self.producer.send(topic, msg) + + def close(self): + self.producer.close() + + +def run_emulator(args): + logger = logging.getLogger("emulator.runner") + + logger.info("Creating device") + device = deviceSonic( + deviceName=args.deviceName, + frequency=args.frequency, + duration=args.duration + ) + + transport = KafkaTransport("127.0.0.1:9092") + + logger.info("Sending messages") + + for msg in device.messages: + logger.info(f"msg {msg.get('datetime')}: {msg}") + transport.send(args.topic, msg) + time.sleep(device.delay_s) + diff --git a/argos/emulator/transports/tcp.py b/argos/emulator/transports/tcp.py new file mode 100644 index 0000000..039b7dc --- /dev/null +++ b/argos/emulator/transports/tcp.py @@ -0,0 +1,103 @@ +import socket +import time +import logging + + +class TCPTransport: + + def __init__( + self, + host="127.0.0.1", + port=9000, + reconnect_delay=3 + ): + + self.logger = logging.getLogger("emulator.transport.tcp") + + self.host = host + self.port = port + self.reconnect_delay = reconnect_delay + + self.sock = None + + self.connect() + + def connect(self): + + while True: + + try: + + self.logger.info( + f"Connecting to {self.host}:{self.port}" + ) + + self.sock = socket.socket( + socket.AF_INET, + socket.SOCK_STREAM + ) + + self.sock.connect( + (self.host, self.port) + ) + + self.logger.info("Connected") + + return + + except Exception as e: + + self.logger.warning( + f"Connection failed: {e}" + ) + + self.logger.info( + f"Retrying in {self.reconnect_delay}s" + ) + + time.sleep(self.reconnect_delay) + + def send(self, msg: str): + + try: + + # newline-delimited JSON + payload = msg + "\n" + + self.sock.sendall( + payload.encode() + ) + + except Exception as e: + + self.logger.warning( + f"Send failed: {e}" + ) + + self.close() + + self.logger.info( + "Attempting reconnect" + ) + + self.connect() + + # retry once after reconnect + payload = msg + "\n" + + self.sock.sendall( + payload.encode() + ) + + def close(self): + + if self.sock: + + try: + self.sock.close() + + except Exception: + pass + + self.sock = None +