Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
198acb7
139: Moving big chunks of code places.
lfse-slafleur Oct 28, 2025
7af8771
139: Finish up first draft before executing it.
lfse-slafleur Oct 31, 2025
69ebb06
139: Get async mostly to work, started on getting sync to work.
lfse-slafleur Oct 31, 2025
8578468
Merge branch 'main' into 139-s2connection-should-have-an-async-capabl…
lfse-slafleur Nov 27, 2025
e1c98b8
139: Add a bunch of stuff. Threading/task management is now clear for…
lfse-slafleur Mar 9, 2026
bfaaab0
139: Some fixes.
lfse-slafleur Mar 9, 2026
18b8919
139: Fix all linting issues.
lfse-slafleur Mar 9, 2026
ad677fc
139: Fix all linting and typing issues.
lfse-slafleur Mar 10, 2026
6b4fa58
Merge branch 'main' into 139-s2connection-should-have-an-async-capabl…
lfse-slafleur Mar 10, 2026
fb15159
139: Propagate asset details from ResourceManagerHandler to all under…
lfse-slafleur Mar 10, 2026
d1d2231
139: Fix missing return value in func sig.
lfse-slafleur Mar 10, 2026
2e43cb9
Merge branch 'main' into 139-s2connection-should-have-an-async-capabl…
lfse-slafleur Mar 10, 2026
fb96895
139: Add examples on how to perform another task after the connection…
lfse-slafleur Mar 18, 2026
9aff094
139: Add functionality to let ws_medium disconnect.
lfse-slafleur Mar 18, 2026
4831386
139: websockets import should be behind the try import.
lfse-slafleur Mar 18, 2026
a2ce80d
139: Fix typing and linting issues and add docs regarding the verify_…
lfse-slafleur Mar 18, 2026
6537af3
139: Document the send_and_await_reception_status functions for both …
lfse-slafleur Mar 18, 2026
5c0f1dd
139: Add DDBC control type handlers in class based approach.
lfse-slafleur Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ ignore-paths=src/s2python/generated/
# avoid hangs.
jobs=1

disable=missing-class-docstring,missing-module-docstring,too-few-public-methods,missing-function-docstring,no-member,unsubscriptable-object,line-too-long
disable=missing-class-docstring,missing-module-docstring,too-few-public-methods,missing-function-docstring,no-member,unsubscriptable-object,line-too-long,duplicate-code
266 changes: 266 additions & 0 deletions examples/async_frbc_rm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
import argparse
import asyncio
import logging
import random
import sys
import uuid
import signal
import datetime
from typing import Optional

from s2python.connection.types import S2ConnectionEventsAndMessages, SendOkayRunAsync
from s2python.common import (
Duration,
Role,
RoleType,
Commodity,
Currency,
NumberRange,
PowerRange,
CommodityQuantity,
PowerValue,
PowerMeasurement,
)
from s2python.frbc import (
FRBCInstruction,
FRBCSystemDescription,
FRBCActuatorDescription,
FRBCStorageDescription,
FRBCOperationMode,
FRBCOperationModeElement,
FRBCFillLevelTargetProfile,
FRBCFillLevelTargetProfileElement,
FRBCStorageStatus,
FRBCActuatorStatus,
)
from s2python.connection import AssetDetails
from s2python.connection.async_ import S2AsyncConnection, WebsocketClientMedium
from s2python.connection.async_.control_type.class_based import (
FRBCControlType,
NoControlControlType,
ResourceManagerHandler,
)

logger = logging.getLogger("s2python")
logger.addHandler(logging.StreamHandler(sys.stdout))
logger.setLevel(logging.DEBUG)


class SendPowerMeasurementPeriodically:
_connection: S2AsyncConnection
_period: datetime.timedelta
_task: Optional[asyncio.Task]

def __init__(self, connection: S2AsyncConnection, period: datetime.timedelta):
self._connection = connection
self._period = period
self._task = None

async def _send_power_measurement(self):
while True:
# Grab the value from an API or anywhere else. Using a random value in this example.
value = random.uniform(10.0, 100.0)
print(f"Sending a power measurement message with value={value}")
await self._connection.send_msg_and_await_reception_status(
PowerMeasurement(
message_id=uuid.uuid4(),
values=[
PowerValue(
value=value,
commodity_quantity=CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC,
)
],
measurement_timestamp=datetime.datetime.now(tz=datetime.timezone.utc),
)
)
print("Sent a power measurement message.")
await asyncio.sleep(self._period.total_seconds())

async def start(self):
if self._task is not None:
raise RuntimeError("Already started")
print("Start sending power measurements periodically")
self._task = asyncio.create_task(self._send_power_measurement())

async def stop(self):
if self._task is None:
raise RuntimeError("Not started yet")
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
print("Stopped sending power measurements periodically")


class MyFRBCControlType(FRBCControlType):
_power_measurement_task: Optional[SendPowerMeasurementPeriodically]

def __init__(self):
super().__init__()
self._power_measurement_task = None

async def handle_instruction(
self,
connection: S2AsyncConnection,
msg: S2ConnectionEventsAndMessages,
send_okay: SendOkayRunAsync,
) -> None:
if not isinstance(msg, FRBCInstruction):
raise RuntimeError(
f"Expected an FRBCInstruction but received a message of type {type(msg)}."
)
print(f"I have received the message {msg} from {connection}")

async def activate(self, connection: S2AsyncConnection) -> None:
print("The control type FRBC is now activated.")

print("Time to send a FRBC SystemDescription")
actuator_id = uuid.uuid4()
operation_mode_id = uuid.uuid4()
await connection.send_msg_and_await_reception_status(
FRBCSystemDescription(
message_id=uuid.uuid4(),
valid_from=datetime.datetime.now(tz=datetime.timezone.utc),
actuators=[
FRBCActuatorDescription(
id=actuator_id,
operation_modes=[
FRBCOperationMode(
id=operation_mode_id,
elements=[
FRBCOperationModeElement(
fill_level_range=NumberRange(
start_of_range=0.0, end_of_range=100.0
),
fill_rate=NumberRange(
start_of_range=-5.0, end_of_range=5.0
),
power_ranges=[
PowerRange(
start_of_range=-200.0,
end_of_range=200.0,
commodity_quantity=CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC,
)
],
)
],
diagnostic_label="Load & unload battery",
abnormal_condition_only=False,
)
],
transitions=[],
timers=[],
supported_commodities=[Commodity.ELECTRICITY],
)
],
storage=FRBCStorageDescription(
fill_level_range=NumberRange(start_of_range=0.0, end_of_range=100.0),
fill_level_label="%",
diagnostic_label="Imaginary battery",
provides_fill_level_target_profile=True,
provides_leakage_behaviour=False,
provides_usage_forecast=False,
),
)
)
print("Also send the target profile")

await connection.send_msg_and_await_reception_status(
FRBCFillLevelTargetProfile(
message_id=uuid.uuid4(),
start_time=datetime.datetime.now(tz=datetime.timezone.utc),
elements=[
FRBCFillLevelTargetProfileElement(
duration=Duration.from_milliseconds(30_000),
fill_level_range=NumberRange(start_of_range=20.0, end_of_range=30.0),
),
FRBCFillLevelTargetProfileElement(
duration=Duration.from_milliseconds(300_000),
fill_level_range=NumberRange(start_of_range=40.0, end_of_range=50.0),
),
],
)
)

print("Also send the storage status.")
await connection.send_msg_and_await_reception_status(
FRBCStorageStatus(message_id=uuid.uuid4(), present_fill_level=10.0)
)

print("Also send the actuator status.")
await connection.send_msg_and_await_reception_status(
FRBCActuatorStatus(
message_id=uuid.uuid4(),
actuator_id=actuator_id,
active_operation_mode_id=operation_mode_id,
operation_mode_factor=0.5,
)
)

self._power_measurement_task = SendPowerMeasurementPeriodically(
connection, datetime.timedelta(seconds=3)
)
await self._power_measurement_task.start()

async def deactivate(self, connection: S2AsyncConnection) -> None:
print("The control type FRBC is now deactivated.")
if self._power_measurement_task is not None:
await self._power_measurement_task.stop()
self._power_measurement_task = None


class MyNoControlControlType(NoControlControlType):
async def activate(self, connection: S2AsyncConnection) -> None:
print("The control type NoControl is now activated.")

async def deactivate(self, connection: S2AsyncConnection) -> None:
print("The control type NoControl is now deactivated.")


async def start_s2_session(url, rm_id: uuid.UUID):
# Configure a resource manager
rm_handler = ResourceManagerHandler(
asset_details=AssetDetails(
resource_id=rm_id,
name="Some asset",
instruction_processing_delay=Duration.from_milliseconds(20),
roles=[Role(role=RoleType.ENERGY_CONSUMER, commodity=Commodity.ELECTRICITY)],
currency=Currency.EUR,
provides_forecast=False,
provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_3_PHASE_SYMMETRIC],
),
control_types=[MyFRBCControlType(), MyNoControlControlType()],
)

# Setup the underlying websocket connection
async with WebsocketClientMedium(url=url, verify_certificate=False) as ws_medium:
# Configure the S2 connection on top of the websocket connection
s2_conn = S2AsyncConnection(medium=ws_medium)
rm_handler.register_handlers(s2_conn)

eventloop = asyncio.get_running_loop()

async def stop():
print("Received signal. Will stop S2 connection.")
await s2_conn.stop()

eventloop.add_signal_handler(signal.SIGINT, lambda: eventloop.create_task(stop()))
eventloop.add_signal_handler(signal.SIGTERM, lambda: eventloop.create_task(stop()))
await s2_conn.run()


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.")
RM_ID = uuid.uuid4()
parser.add_argument(
"--endpoint",
type=str,
required=False,
help=f"WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8003/ws/{RM_ID}",
default=f"ws://localhost:8003/ws/{RM_ID}",
)
args = parser.parse_args()

asyncio.run(start_s2_session(args.endpoint, RM_ID))
Loading
Loading