From cbb8004b8452e0b47fe4a763a7d7cb8ec034a67c Mon Sep 17 00:00:00 2001 From: Anthony Charlton Date: Wed, 26 Mar 2025 21:29:35 +1100 Subject: [PATCH] Added support for `AddCutEvent`, `RemoveCutEvent`, `AddJumperEvent` and `RemoveJumperEvent` Signed-off-by: Anthony Charlton --- changelog.md | 6 +- docs/docs/update-network-state-client.mdx | 58 ++++- .../streaming/data/current_state_event.py | 219 +++++++++++++++++- test/run_streaming.py | 10 +- .../data/test_current_state_event.py | 138 +++++++++-- 5 files changed, 388 insertions(+), 43 deletions(-) diff --git a/changelog.md b/changelog.md index 272dd07bd..63d2b035c 100644 --- a/changelog.md +++ b/changelog.md @@ -4,7 +4,11 @@ * None. ### New Features -* None. +* Added support for the following `CurrentStateEvent` types: + * `AddCutEvent`. + * `RemoveCutEvent`. + * `AddJumperEvent`. + * `RemoveJumperEvent`. ### Enhancements * `QueryNetworkStateClient.reportBatchStatus` can be used to send status responses for batches returned from the service via diff --git a/docs/docs/update-network-state-client.mdx b/docs/docs/update-network-state-client.mdx index 8c066232d..f2f6f6bf4 100644 --- a/docs/docs/update-network-state-client.mdx +++ b/docs/docs/update-network-state-client.mdx @@ -44,10 +44,6 @@ Now that you have a client, you can use it to update the state of the network on The current state of the network can be updated using the `set_current_states` function on the `UpdateNetworkStateClient`. All events passed in the same list will be executed as a batch. -:::note -The current implementation only supports updating the current state of switches. In the future we will add more functionality such as cuts and jumpers. -::: - #### Updating current switch state The current state of switches can be updating by passing a `SwitchStateEvent` to the `set_current_states` function. @@ -62,6 +58,60 @@ event2 = SwitchStateEvent("event2", datetime.now(), "switch_id_2", SwitchAction. response = client.set_current_states(1, (event1, event2)) ``` +#### Adding cuts + +You can cut an AcLineSegment in the current state of the network by passing an `AddCutEvent` to the `set_current_states` function. + +```python +from datetime import datetime + +from zepben.evolve import AddCutEvent + +event1 = AddCutEvent("event1", datetime.now(), "cut_id", "acls_id") +response = client.set_current_states(1, (event1)) +``` + +#### Removing cuts + +You can remove a previously added cut from the current state of the network by passing a `RemoveCutEvent` to the `set_current_states` function. + +```python +from datetime import datetime + +from zepben.evolve import RemoveCutEvent + +event1 = RemoveCutEvent("event1", datetime.now(), "cut_id") +response = client.set_current_states(1, (event1)) +``` + +#### Adding jumpers + +You can add a jumper between two other pieces of equipment in the current state of the network by passing an `AddJumperEvent` to the `set_current_states` function. + +```python +from datetime import datetime + +from zepben.evolve import AddJumperEvent, JumperConnection + +event1 = AddJumperEvent("event1", datetime.now(), "jumper_id", JumperConnection("from_id", JumperConnection("to_id"))) +response = client.set_current_states(1, (event1)) +``` + +#### Removing jumpers + +You can remove a previously added jumper from the current state of the network by passing a `RemoveJumperEvent` to the `set_current_states` function. + +```python +from datetime import datetime + +from zepben.evolve import RemoveJumperEvent + +event1 = RemoveJumperEvent("event1", datetime.now(), "jumper_id") +response = client.set_current_states(1, (event1)) +``` + +#### Multiple Requests + If you have multiple batches to send, you can use `set_current_states_in_batches` rather than calling `set_current_states` multiple times ```python diff --git a/src/zepben/evolve/streaming/data/current_state_event.py b/src/zepben/evolve/streaming/data/current_state_event.py index 8c81e7095..546045d0a 100644 --- a/src/zepben/evolve/streaming/data/current_state_event.py +++ b/src/zepben/evolve/streaming/data/current_state_event.py @@ -2,14 +2,16 @@ # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. -__all__ = ["CurrentStateEvent", "SwitchStateEvent", "SwitchAction"] +__all__ = ["CurrentStateEvent", "SwitchStateEvent", "SwitchAction", "AddCutEvent", "RemoveCutEvent", "AddJumperEvent", "RemoveJumperEvent", "JumperConnection"] from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime from enum import Enum -from zepben.protobuf.ns.data.change_events_pb2 import CurrentStateEvent as PBCurrentStateEvent, SwitchStateEvent as PBSwitchStateEvent +from zepben.protobuf.ns.data.change_events_pb2 import CurrentStateEvent as PBCurrentStateEvent, SwitchStateEvent as PBSwitchStateEvent, \ + AddCutEvent as PBAddCutEvent, RemoveCutEvent as PBRemoveCutEvent, AddJumperEvent as PBAddJumperEvent, RemoveJumperEvent as PBRemoveJumperEvent, \ + JumperConnection as PBJumperConnection from zepben.evolve.model.cim.iec61970.base.core.phase_code import PhaseCode, phase_code_by_id from zepben.evolve.util import datetime_to_timestamp @@ -33,18 +35,26 @@ def __init__(self, event_id: str, timestamp: datetime): @staticmethod def from_pb(event: PBCurrentStateEvent) -> 'CurrentStateEvent': """ - Creates a CurrentStateEvent object from a protobuf CurrentStateEvent. + Creates a `CurrentStateEvent` object from a protobuf `CurrentStateEvent`. """ active_event = event.WhichOneof("event") if active_event == "switch": return SwitchStateEvent.from_pb(event) + elif active_event == "addCut": + return AddCutEvent.from_pb(event) + elif active_event == "removeCut": + return RemoveCutEvent.from_pb(event) + elif active_event == "addJumper": + return AddJumperEvent.from_pb(event) + elif active_event == "removeJumper": + return RemoveJumperEvent.from_pb(event) else: raise NotImplementedError(f"'{active_event}' is currently unsupported.") @abstractmethod def to_pb(self) -> PBCurrentStateEvent: """ - Creates a protobuf CurrentStateEvent object with switch from a CurrentStateEvent. + Creates a protobuf `CurrentStateEvent` object with switch from this `CurrentStateEvent`. """ pass @@ -72,7 +82,7 @@ def __init__(self, event_id: str, timestamp: datetime, mrid: str, action: 'Switc @staticmethod def from_pb(event: PBCurrentStateEvent) -> 'SwitchStateEvent': """ - Creates a SwitchStateEvent object from a protobuf CurrentStateEvent. + Creates a `SwitchStateEvent` object from a protobuf `CurrentStateEvent`. """ return SwitchStateEvent( event.eventId, @@ -84,10 +94,174 @@ def from_pb(event: PBCurrentStateEvent) -> 'SwitchStateEvent': def to_pb(self) -> PBCurrentStateEvent: """ - Creates a protobuf CurrentStateEvent object with switch from a SwitchStateEvent. + Creates a protobuf `CurrentStateEvent` object with `switch` from this `SwitchStateEvent`. """ - return PBCurrentStateEvent(eventId=self.event_id, timestamp=datetime_to_timestamp(self.timestamp), - switch=PBSwitchStateEvent(mRID=self.mrid, action=self.action.name, phases=self.phases.name)) + return PBCurrentStateEvent( + eventId=self.event_id, + timestamp=datetime_to_timestamp(self.timestamp), + switch=PBSwitchStateEvent(mRID=self.mrid, action=self.action.name, phases=self.phases.name) + ) + + +@dataclass +class AddCutEvent(CurrentStateEvent): + """ + An event to add a cut to the network. + + Attributes: + event_id: An identifier of this event. This must be unique across requests to allow detection of + duplicates when requesting events via dates vs those streamed via live updates. + timestamp: The timestamp when the event occurred. This is always handled as UTC (Coordinated Universal Time). + mrid: The mRID of the cut defined by this event. This should match any future remove instructions. + acls_mrid: The mRID of the AC line segment that was cut. + """ + + def __init__(self, event_id: str, timestamp: datetime, mrid: str, acls_mrid: str): + super().__init__(event_id, timestamp) + self.mrid = mrid + self.acls_mrid = acls_mrid + + @staticmethod + def from_pb(event: PBCurrentStateEvent) -> 'AddCutEvent': + """ + Creates an `AddCutEvent` object from a protobuf `PBCurrentStateEvent` + """ + return AddCutEvent( + event.eventId, + event.timestamp.ToDatetime(), + event.addCut.mRID, + event.addCut.aclsMRID + ) + + def to_pb(self) -> PBCurrentStateEvent: + """ + Creates a protobuf `PBCurrentStateEvent` object with `addCut` from this `AddCutEvent` + """ + return PBCurrentStateEvent( + eventId=self.event_id, + timestamp=datetime_to_timestamp(self.timestamp), + addCut=PBAddCutEvent(mRID=self.mrid, aclsMRID=self.acls_mrid) + ) + + +@dataclass +class RemoveCutEvent(CurrentStateEvent): + """ + An event to remove a cut from the network. + + Attributes: + event_id: An identifier of this event. This must be unique across requests to allow detection of + duplicates when requesting events via dates vs those streamed via live updates. + timestamp: The timestamp when the event occurred. This is always handled as UTC (Coordinated Universal Time). + mrid: The mRID of the cut to remove. This should match a previously added cut. + """ + + def __init__(self, event_id: str, timestamp: datetime, mrid: str): + super().__init__(event_id, timestamp) + self.mrid = mrid + + @staticmethod + def from_pb(event: PBCurrentStateEvent) -> 'RemoveCutEvent': + """ + Creates a `RemoveCutEvent` object from protobuf `PBCurrentStateEvent` + """ + return RemoveCutEvent( + event.eventId, + event.timestamp.ToDatetime(), + event.removeCut.mRID + ) + + def to_pb(self) -> PBCurrentStateEvent: + """ + Creates a protobuf `PBCurrentStateEvent` object with `removeCut` from this `RemoveCutEvent` + """ + return PBCurrentStateEvent( + eventId=self.event_id, + timestamp=datetime_to_timestamp(self.timestamp), + removeCut=PBRemoveCutEvent(mRID=self.mrid) + ) + + +@dataclass +class AddJumperEvent(CurrentStateEvent): + """ + An event to add a jumper to the network. + + Attributes: + event_id: An identifier of this event. This must be unique across requests to allow detection of + duplicates when requesting events via dates vs those streamed via live updates. + timestamp: The timestamp when the event occurred. This is always handled as UTC (Coordinated Universal Time). + mrid: The mRID of the jumper affected by this event. + from_connection: Information on how this jumper is connected at one end of the jumper. + to_connection: Information on how this jumper is connected at the other end of the jumper. + """ + + def __init__(self, event_id: str, timestamp: datetime, mrid: str, from_connection: 'JumperConnection', to_connection: 'JumperConnection'): + super().__init__(event_id, timestamp) + self.mrid = mrid + self.from_connection = from_connection + self.to_connection = to_connection + + @staticmethod + def from_pb(event: PBCurrentStateEvent) -> 'AddJumperEvent': + """ + Creates an `AddJumperEvent` object from protobuf `PBCurrentStateEvent` + """ + return AddJumperEvent( + event.eventId, + event.timestamp.ToDatetime(), + event.addJumper.mRID, + JumperConnection.from_pb(event.addJumper.fromConnection), + JumperConnection.from_pb(event.addJumper.toConnection) + ) + + def to_pb(self) -> PBCurrentStateEvent: + """ + Creates a protobuf `PBCurrentStateEvent` object with `addJumper` from this `AddJumperEvent` + """ + return PBCurrentStateEvent( + eventId=self.event_id, + timestamp=datetime_to_timestamp(self.timestamp), + addJumper=PBAddJumperEvent(mRID=self.mrid, fromConnection=self.from_connection.to_pb(), toConnection=self.to_connection.to_pb()) + ) + + +@dataclass +class RemoveJumperEvent(CurrentStateEvent): + """ + An event to remove a jumper from the network. + + Attributes: + event_id: An identifier of this event. This must be unique across requests to allow detection of + duplicates when requesting events via dates vs those streamed via live updates. + timestamp: The timestamp when the event occurred. This is always handled as UTC (Coordinated Universal Time). + mrid: The mRID of the jumper to remove. This should match a previously added jumper. + """ + + def __init__(self, event_id: str, timestamp: datetime, mrid: str): + super().__init__(event_id, timestamp) + self.mrid = mrid + + @staticmethod + def from_pb(event: PBCurrentStateEvent) -> 'RemoveJumperEvent': + """ + Creates a `RemoveJumperEvent` object from protobuf `PBCurrentStateEvent` + """ + return RemoveJumperEvent( + event.eventId, + event.timestamp.ToDatetime(), + event.removeJumper.mRID + ) + + def to_pb(self) -> PBCurrentStateEvent: + """ + Creates a protobuf `PBCurrentStateEvent` object with `removeJumper` from this `RemoveJumperEvent` + """ + return PBCurrentStateEvent( + eventId=self.event_id, + timestamp=datetime_to_timestamp(self.timestamp), + removeJumper=PBRemoveJumperEvent(mRID=self.mrid) + ) class SwitchAction(Enum): @@ -97,3 +271,32 @@ class SwitchAction(Enum): UNKNOWN = 0 # The specified action was unknown, or was not set. OPEN = 1 # A request to open a switch. CLOSE = 2 # A request to close a switch. + + +class JumperConnection: + """ + Information about how a jumper is connected to the network. + + Attributes: + connected_mrid: The mRID of the conducting equipment (or terminal) connected to this end of the jumper. + """ + + def __init__(self, connected_mrid: str): + self.connected_mrid = connected_mrid + + @staticmethod + def from_pb(connection: PBJumperConnection) -> 'JumperConnection': + """ + Creates a `JumperConnection` object from protobuf `PBJumperConnection` + """ + return JumperConnection( + connection.connectedMRID, + ) + + def to_pb(self) -> PBJumperConnection: + """ + Creates a protobuf `PBJumperConnection` object from this `JumperConnection` + """ + return PBJumperConnection( + connectedMRID=self.connected_mrid + ) diff --git a/test/run_streaming.py b/test/run_streaming.py index 64b3c8653..8425af389 100644 --- a/test/run_streaming.py +++ b/test/run_streaming.py @@ -7,7 +7,7 @@ from time import perf_counter, process_time from typing import Callable -from zepben.evolve import connect, Feeder +from zepben.evolve import connect_insecure, Feeder from zepben.evolve.streaming.get.network_consumer import SyncNetworkConsumerClient rpc_port = 9001 @@ -26,7 +26,7 @@ def run_streaming(): def run_retrieve(): - with connect(rpc_port=rpc_port) as channel: + with connect_insecure(rpc_port=rpc_port) as channel: client = SyncNetworkConsumerClient(channel=channel) client.retrieve_network().throw_on_error() @@ -36,7 +36,7 @@ def run_retrieve(): def run_get_object(): - with connect(rpc_port=rpc_port) as channel: + with connect_insecure(rpc_port=rpc_port) as channel: client = SyncNetworkConsumerClient(channel=channel) client.get_identified_object("21527151-6fce-423d-84e5-8254a00b05b1").throw_on_error() @@ -46,7 +46,7 @@ def run_get_object(): def run_feeder(): - with connect(rpc_port=rpc_port) as channel: + with connect_insecure(rpc_port=rpc_port) as channel: client = SyncNetworkConsumerClient(channel=channel) client.get_equipment_container("CTN005", Feeder).throw_on_error() @@ -56,7 +56,7 @@ def run_feeder(): def run_network_hierarchy(): - with connect(rpc_port=rpc_port) as channel: + with connect_insecure(rpc_port=rpc_port) as channel: client = SyncNetworkConsumerClient(channel=channel) network_hierarchy = client.get_network_hierarchy().throw_on_error().value diff --git a/test/streaming/data/test_current_state_event.py b/test/streaming/data/test_current_state_event.py index 1bc0f2a76..c834908e0 100644 --- a/test/streaming/data/test_current_state_event.py +++ b/test/streaming/data/test_current_state_event.py @@ -9,41 +9,129 @@ from zepben.protobuf.cim.iec61970.base.core.PhaseCode_pb2 import PhaseCode as PBPhaseCode from zepben.protobuf.ns.data.change_events_pb2 import CurrentStateEvent as PBCurrentStateEvent, SwitchStateEvent as PBSwitchStateEvent, \ AddCutEvent as PBAddCutEvent, RemoveCutEvent as PBRemoveCutEvent, AddJumperEvent as PBAddJumperEvent, RemoveJumperEvent as PBRemoveJumperEvent, \ - SwitchAction as PBSwitchAction + SwitchAction as PBSwitchAction, JumperConnection as PBJumperConnection -from zepben.evolve import PhaseCode, datetime_to_timestamp, CurrentStateEvent, SwitchStateEvent, SwitchAction - - -def _test_from_pb_not_implemented(event: PBCurrentStateEvent): - with pytest.raises(NotImplementedError): - CurrentStateEvent.from_pb(event) +from zepben.evolve import PhaseCode, datetime_to_timestamp, CurrentStateEvent, SwitchStateEvent, SwitchAction, AddCutEvent, RemoveCutEvent, AddJumperEvent, \ + RemoveJumperEvent class TestCurrentStateEvent: def test_from_pb(self): - switch_event = CurrentStateEvent.from_pb(PBCurrentStateEvent(switch=PBSwitchStateEvent())) - assert isinstance(switch_event, SwitchStateEvent) + event = CurrentStateEvent.from_pb(PBCurrentStateEvent(switch=PBSwitchStateEvent())) + assert isinstance(event, SwitchStateEvent) + + event = CurrentStateEvent.from_pb(PBCurrentStateEvent(addCut=PBAddCutEvent())) + assert isinstance(event, AddCutEvent) + + event = CurrentStateEvent.from_pb(PBCurrentStateEvent(removeCut=PBRemoveCutEvent())) + assert isinstance(event, RemoveCutEvent) + + event = CurrentStateEvent.from_pb(PBCurrentStateEvent(addJumper=PBAddJumperEvent())) + assert isinstance(event, AddJumperEvent) + + event = CurrentStateEvent.from_pb(PBCurrentStateEvent(removeJumper=PBRemoveJumperEvent())) + assert isinstance(event, RemoveJumperEvent) def test_from_pb_not_implemented(self): - _test_from_pb_not_implemented(PBCurrentStateEvent(addCut=PBAddCutEvent())) - _test_from_pb_not_implemented(PBCurrentStateEvent(removeCut=PBRemoveCutEvent())) - _test_from_pb_not_implemented(PBCurrentStateEvent(addJumper=PBAddJumperEvent())) - _test_from_pb_not_implemented(PBCurrentStateEvent(removeJumper=PBRemoveJumperEvent())) - - def test_switch_state_event_protobuf_conversion(self): - pb_event = PBCurrentStateEvent(eventId="event1", timestamp=datetime_to_timestamp(datetime.now()), - switch=PBSwitchStateEvent(mRID="switch-1", action=PBSwitchAction.OPEN, phases=PBPhaseCode.ABCN)) - switch_state_event = SwitchStateEvent.from_pb(pb_event) - assert switch_state_event.event_id == pb_event.eventId - assert switch_state_event.timestamp == pb_event.timestamp.ToDatetime() - assert switch_state_event.mrid == pb_event.switch.mRID - assert switch_state_event.action == SwitchAction.OPEN - assert switch_state_event.phases == PhaseCode.ABCN - - pb = switch_state_event.to_pb() + with pytest.raises(NotImplementedError): + CurrentStateEvent.from_pb(PBCurrentStateEvent()) + + def test_event_protobuf_conversion(self): + pb_event = PBCurrentStateEvent( + eventId="event1", + timestamp=datetime_to_timestamp(datetime.now()), + switch=PBSwitchStateEvent(mRID="switch-1", action=PBSwitchAction.OPEN, phases=PBPhaseCode.ABCN) + ) + + event = SwitchStateEvent.from_pb(pb_event) + assert event.event_id == pb_event.eventId + assert event.timestamp == pb_event.timestamp.ToDatetime() + assert event.mrid == pb_event.switch.mRID + assert event.action == SwitchAction.OPEN + assert event.phases == PhaseCode.ABCN + + pb = event.to_pb() assert pb.eventId == pb_event.eventId assert pb.timestamp == pb_event.timestamp assert pb.switch.mRID == pb_event.switch.mRID assert pb.switch.action == pb_event.switch.action assert pb.switch.phases == pb_event.switch.phases + + def test_add_cut_event_protobuf_conversion(self): + pb_event = PBCurrentStateEvent( + eventId="event1", + timestamp=datetime_to_timestamp(datetime.now()), + addCut=PBAddCutEvent(mRID="cut-1", aclsMRID="acls-1") + ) + + event = AddCutEvent.from_pb(pb_event) + assert event.event_id == pb_event.eventId + assert event.timestamp == pb_event.timestamp.ToDatetime() + assert event.mrid == pb_event.addCut.mRID + assert event.acls_mrid == pb_event.addCut.aclsMRID + + pb = event.to_pb() + assert pb.eventId == pb_event.eventId + assert pb.timestamp == pb_event.timestamp + assert pb.addCut.mRID == pb_event.addCut.mRID + assert pb.addCut.aclsMRID == pb_event.addCut.aclsMRID + + def test_remove_cut_event_protobuf_conversion(self): + pb_event = PBCurrentStateEvent( + eventId="event1", + timestamp=datetime_to_timestamp(datetime.now()), + removeCut=PBRemoveCutEvent(mRID="cut-1") + ) + + event = RemoveCutEvent.from_pb(pb_event) + assert event.event_id == pb_event.eventId + assert event.timestamp == pb_event.timestamp.ToDatetime() + assert event.mrid == pb_event.removeCut.mRID + + pb = event.to_pb() + assert pb.eventId == pb_event.eventId + assert pb.timestamp == pb_event.timestamp + assert pb.removeCut.mRID == pb_event.removeCut.mRID + + def test_add_jumper_event_protobuf_conversion(self): + pb_event = PBCurrentStateEvent( + eventId="event1", + timestamp=datetime_to_timestamp(datetime.now()), + addJumper=PBAddJumperEvent( + mRID="jumper-1", + fromConnection=PBJumperConnection(connectedMRID="from-id"), + toConnection=PBJumperConnection(connectedMRID="to-id") + ) + ) + + event = AddJumperEvent.from_pb(pb_event) + assert event.event_id == pb_event.eventId + assert event.timestamp == pb_event.timestamp.ToDatetime() + assert event.mrid == pb_event.addJumper.mRID + assert event.from_connection.connected_mrid == pb_event.addJumper.fromConnection.connectedMRID + assert event.to_connection.connected_mrid == pb_event.addJumper.toConnection.connectedMRID + + pb = event.to_pb() + assert pb.eventId == pb_event.eventId + assert pb.timestamp == pb_event.timestamp + assert pb.addJumper.mRID == pb_event.addJumper.mRID + assert pb.addJumper.fromConnection.connectedMRID == pb_event.addJumper.fromConnection.connectedMRID + assert pb.addJumper.toConnection.connectedMRID == pb_event.addJumper.toConnection.connectedMRID + + def test_remove_jumper_event_protobuf_conversion(self): + pb_event = PBCurrentStateEvent( + eventId="event1", + timestamp=datetime_to_timestamp(datetime.now()), + removeJumper=PBRemoveJumperEvent(mRID="jumper-1") + ) + + event = RemoveJumperEvent.from_pb(pb_event) + assert event.event_id == pb_event.eventId + assert event.timestamp == pb_event.timestamp.ToDatetime() + assert event.mrid == pb_event.removeJumper.mRID + + pb = event.to_pb() + assert pb.eventId == pb_event.eventId + assert pb.timestamp == pb_event.timestamp + assert pb.removeJumper.mRID == pb_event.removeJumper.mRID