Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added argos/emulator/__init__.py
Empty file.
Empty file added argos/emulator/devices/base.py
Empty file.
106 changes: 106 additions & 0 deletions argos/emulator/devices/sonic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import logging
from numpy import random
import pandas
import os

print("cwd:", os.getcwd())
from argos.utils.logging import get_logger as argos_get_logger

class demoDevice:


def __init__(self,deviceName,frequency,duration):
"""
Initializes a demo device with the fields.

Parameters
----------
deviceConfiguration : dict
A configuration of the device:


"""
self.logger = argos_get_logger(self)
self.deviceName = deviceName
self.frequency = frequency
self.duration = duration

@property
def delay_s(self):
return pandas.Timedelta('%.3fs' % (1/float(self.frequency))).value/1e9

def stream_messages(self):
import time
import pandas
from numpy import random

u = v = w = T = 0

startDate = pandas.Timestamp.now()
endTime = startDate + pandas.to_timedelta(self.duration)

timeDelta = pandas.Timedelta('%.3fs' % (1 / float(self.frequency)))

self.logger.execution(
f"Streaming from {startDate} to {endTime} at interval {timeDelta}"
)

ts = startDate

while ts <= endTime:
u = self.u_min + (self.u_max - self.u_min) * random.rand()
v = self.v_min + (self.v_max - self.v_min) * random.rand()
w = self.w_min + (self.w_max - self.w_min) * random.rand()
T = self.T_min + (self.T_max - self.T_min) * random.rand()

yield {
"datetime": ts,
"u": u,
"v": v,
"w": w,
"t": T
}

ts += timeDelta
time.sleep(timeDelta.total_seconds())
def emmitMessage(self):
pass

class deviceSonic(demoDevice):

u_max = 6
u_min = -6
v_max = 6
v_min = -6
w_max = 0.3
w_min = -0.3
T_max = 40
T_min = 20

def __init__(self,deviceName,frequency,duration):
super().__init__(deviceName,frequency,duration)


def createMessages(self):
u = 0
v = 0
w = 0
T = 0
self.logger.execution("------------ Start -----------------")
self.logger.debug(f"Frequency {self.frequency} duration {self.duration}")
startDate = pandas.Timestamp.now()
endTime = startDate + pandas.to_timedelta(self.duration)
timeDelta = pandas.Timedelta('%.3fs' % (1/float(self.frequency)))

self.logger.execution(f"From {startDate} to {endTime} at frequency {timeDelta}")

dateRange = pandas.date_range(startDate,endTime,freq=timeDelta)
self.logger.execution(f"Creating {len(dateRange)} messages")
for ts in dateRange:
u = self.u_min + (self.u_max - self.u_min) * random.rand()
v = self.v_min + (self.v_max - self.v_min) * random.rand()
w = self.w_min + (self.w_max - self.w_min) * random.rand()
T = self.T_min + (self.T_max - self.T_min) * random.rand()
self.messages.append(dict(datetime=ts.value,u=u,v=v,w=w,t=T))

self.logger.execution("------------ End -----------------")
95 changes: 95 additions & 0 deletions argos/emulator/run_devices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#! /usr/bin/env python
import argparse
import json
import logging
from devices.sonic import deviceSonic
from transports.my_kafka import KafkaTransport
from transports.tcp import TCPTransport

if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("emulator.run_device")

parser = argparse.ArgumentParser()

parser.add_argument(
"--deviceName",
required=True,
help="Device name"
)

parser.add_argument(
"--frequency",
required=True,
type=float,
help="Frequency in Hz"
)

parser.add_argument(
"--duration",
required=True,
help="Duration string (example: 10s, 5m, 1h)"
)

parser.add_argument(
"--topic",
required=True,
help="Kafka topic"
)

parser.add_argument(
"--bootstrapServers",
default="127.0.0.1:9092",
help="Kafka bootstrap servers"
)

parser.add_argument(
"--host",
default="127.0.0.1",
help="TCP server host"
)

parser.add_argument(
"--port",
type=int,
default=9000,
help="TCP server port"
)

args = parser.parse_args()

logger.info("Starting emulator")

device = deviceSonic(
deviceName=args.deviceName,
frequency=args.frequency,
duration=args.duration
)



transport = KafkaTransport()

try:

for msg in device.stream_messages():

sndmsg = json.dumps(msg, default=str)

logger.info(
f"[{args.deviceName}] Sending message: {sndmsg}"
)

transport.send(
msg=sndmsg
)

except KeyboardInterrupt:
logger.info("Stopping emulator")

finally:
transport.close()
logger.info("Transport closed")

"""
"""
112 changes: 112 additions & 0 deletions argos/emulator/run_devices_to_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!/usr/bin/env python
import argparse
import json
import logging

from devices.sonic import deviceSonic

import csv

class CsvTransport:

def __init__(self, filename="sonic_data.csv"):
self.file = open(filename, "a", newline="")
self.writer = None

def send(self, topic: str, msg: str):
data = json.loads(msg)

if self.writer is None:
self.writer = csv.DictWriter(
self.file,
fieldnames=data.keys()
)
self.writer.writeheader()

self.writer.writerow(data)
self.file.flush()

def close(self):
self.file.close()

"""
class ConsoleTransport:

Simple transport that prints messages instead of sending them anywhere.


def send(self, topic: str, msg: str):
print(f"[TOPIC: {topic}] {msg}")

def close(self):
print("ConsoleTransport closed")
"""

def parse_args():
parser = argparse.ArgumentParser()

parser.add_argument(
"--deviceName",
default="test_device",
help="Device name (default: test_device)"
)

parser.add_argument(
"--frequency",
type=float,
default=1.0,
help="Frequency in Hz (default: 1.0)"
)

parser.add_argument(
"--duration",
default="10s",
help="Duration string (default: 10s)"
)

parser.add_argument(
"--topic",
default="test-topic",
help="Logical topic name (default: test-topic)"
)

return parser.parse_args()


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("emulator.run_device")

args = parse_args()

logger.info("Starting emulator (LOCAL MODE - no Kafka)")

device = deviceSonic(
deviceName=args.deviceName,
frequency=args.frequency,
duration=args.duration
)

transport = CsvTransport("sonic_data.csv")
try:
for msg in device.stream_messages():

sndmsg = json.dumps(msg, default=str)

logger.info(
f"[{args.deviceName}] Sending message: {sndmsg}"
)

transport.send(
topic=args.topic,
msg=sndmsg
)

except KeyboardInterrupt:
logger.info("Stopping emulator")

finally:
transport.close()
logger.info("Transport closed")


47 changes: 47 additions & 0 deletions argos/emulator/transports/my_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import json
import time
import logging
from kafka import KafkaProducer
import sys
import os

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))

from ..devices.sonic import deviceSonic


class KafkaTransport:
def __init__(self, bootstrap_servers="127.0.0.1:9092"):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
self.logger = logging.getLogger("emulator.transport.kafka")

def send(self, topic, msg: dict):
self.logger.info(f"Sending message: {msg}")
self.producer.send(topic, msg)

def close(self):
self.producer.close()


def run_emulator(args):
logger = logging.getLogger("emulator.runner")

logger.info("Creating device")
device = deviceSonic(
deviceName=args.deviceName,
frequency=args.frequency,
duration=args.duration
)

transport = KafkaTransport("127.0.0.1:9092")

logger.info("Sending messages")

for msg in device.messages:
logger.info(f"msg {msg.get('datetime')}: {msg}")
transport.send(args.topic, msg)
time.sleep(device.delay_s)

Loading
Loading