Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
* None.

### New Features
* None.
* Added support for the following `CurrentStateEvent` types:
* `AddCutEvent`.
* `RemoveCutEvent`.
* `AddJumperEvent`.
* `RemoveJumperEvent`.

### Enhancements
* `QueryNetworkStateClient.reportBatchStatus` can be used to send status responses for batches returned from the service via
Expand Down
58 changes: 54 additions & 4 deletions docs/docs/update-network-state-client.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ Now that you have a client, you can use it to update the state of the network on
The current state of the network can be updated using the `set_current_states` function on the `UpdateNetworkStateClient`. All events passed in the same list will
be executed as a batch.

:::note
The current implementation only supports updating the current state of switches. In the future we will add more functionality such as cuts and jumpers.
:::

#### Updating current switch state

The current state of switches can be updating by passing a `SwitchStateEvent` to the `set_current_states` function.
Expand All @@ -62,6 +58,60 @@ event2 = SwitchStateEvent("event2", datetime.now(), "switch_id_2", SwitchAction.
response = client.set_current_states(1, (event1, event2))
```

#### Adding cuts

You can cut an AcLineSegment in the current state of the network by passing an `AddCutEvent` to the `set_current_states` function.

```python
from datetime import datetime

from zepben.evolve import AddCutEvent

event1 = AddCutEvent("event1", datetime.now(), "cut_id", "acls_id")
response = client.set_current_states(1, (event1))
```

#### Removing cuts

You can remove a previously added cut from the current state of the network by passing a `RemoveCutEvent` to the `set_current_states` function.

```python
from datetime import datetime

from zepben.evolve import RemoveCutEvent

event1 = RemoveCutEvent("event1", datetime.now(), "cut_id")
response = client.set_current_states(1, (event1))
```

#### Adding jumpers

You can add a jumper between two other pieces of equipment in the current state of the network by passing an `AddJumperEvent` to the `set_current_states` function.

```python
from datetime import datetime

from zepben.evolve import AddJumperEvent, JumperConnection

event1 = AddJumperEvent("event1", datetime.now(), "jumper_id", JumperConnection("from_id", JumperConnection("to_id")))
response = client.set_current_states(1, (event1))
```

#### Removing jumpers

You can remove a previously added jumper from the current state of the network by passing a `RemoveJumperEvent` to the `set_current_states` function.

```python
from datetime import datetime

from zepben.evolve import RemoveJumperEvent

event1 = RemoveJumperEvent("event1", datetime.now(), "jumper_id")
response = client.set_current_states(1, (event1))
```

#### Multiple Requests

If you have multiple batches to send, you can use `set_current_states_in_batches` rather than calling `set_current_states` multiple times

```python
Expand Down
219 changes: 211 additions & 8 deletions src/zepben/evolve/streaming/data/current_state_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
__all__ = ["CurrentStateEvent", "SwitchStateEvent", "SwitchAction"]
__all__ = ["CurrentStateEvent", "SwitchStateEvent", "SwitchAction", "AddCutEvent", "RemoveCutEvent", "AddJumperEvent", "RemoveJumperEvent", "JumperConnection"]

from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from enum import Enum

from zepben.protobuf.ns.data.change_events_pb2 import CurrentStateEvent as PBCurrentStateEvent, SwitchStateEvent as PBSwitchStateEvent
from zepben.protobuf.ns.data.change_events_pb2 import CurrentStateEvent as PBCurrentStateEvent, SwitchStateEvent as PBSwitchStateEvent, \
AddCutEvent as PBAddCutEvent, RemoveCutEvent as PBRemoveCutEvent, AddJumperEvent as PBAddJumperEvent, RemoveJumperEvent as PBRemoveJumperEvent, \
JumperConnection as PBJumperConnection

from zepben.evolve.model.cim.iec61970.base.core.phase_code import PhaseCode, phase_code_by_id
from zepben.evolve.util import datetime_to_timestamp
Expand All @@ -33,18 +35,26 @@ def __init__(self, event_id: str, timestamp: datetime):
@staticmethod
def from_pb(event: PBCurrentStateEvent) -> 'CurrentStateEvent':
"""
Creates a CurrentStateEvent object from a protobuf CurrentStateEvent.
Creates a `CurrentStateEvent` object from a protobuf `CurrentStateEvent`.
"""
active_event = event.WhichOneof("event")
if active_event == "switch":
return SwitchStateEvent.from_pb(event)
elif active_event == "addCut":
return AddCutEvent.from_pb(event)
elif active_event == "removeCut":
return RemoveCutEvent.from_pb(event)
elif active_event == "addJumper":
return AddJumperEvent.from_pb(event)
elif active_event == "removeJumper":
return RemoveJumperEvent.from_pb(event)
else:
raise NotImplementedError(f"'{active_event}' is currently unsupported.")

@abstractmethod
def to_pb(self) -> PBCurrentStateEvent:
"""
Creates a protobuf CurrentStateEvent object with switch from a CurrentStateEvent.
Creates a protobuf `CurrentStateEvent` object with switch from this `CurrentStateEvent`.
"""
pass

Expand Down Expand Up @@ -72,7 +82,7 @@ def __init__(self, event_id: str, timestamp: datetime, mrid: str, action: 'Switc
@staticmethod
def from_pb(event: PBCurrentStateEvent) -> 'SwitchStateEvent':
"""
Creates a SwitchStateEvent object from a protobuf CurrentStateEvent.
Creates a `SwitchStateEvent` object from a protobuf `CurrentStateEvent`.
"""
return SwitchStateEvent(
event.eventId,
Expand All @@ -84,10 +94,174 @@ def from_pb(event: PBCurrentStateEvent) -> 'SwitchStateEvent':

def to_pb(self) -> PBCurrentStateEvent:
"""
Creates a protobuf CurrentStateEvent object with switch from a SwitchStateEvent.
Creates a protobuf `CurrentStateEvent` object with `switch` from this `SwitchStateEvent`.
"""
return PBCurrentStateEvent(eventId=self.event_id, timestamp=datetime_to_timestamp(self.timestamp),
switch=PBSwitchStateEvent(mRID=self.mrid, action=self.action.name, phases=self.phases.name))
return PBCurrentStateEvent(
eventId=self.event_id,
timestamp=datetime_to_timestamp(self.timestamp),
switch=PBSwitchStateEvent(mRID=self.mrid, action=self.action.name, phases=self.phases.name)
)


@dataclass
class AddCutEvent(CurrentStateEvent):
"""
An event to add a cut to the network.

Attributes:
event_id: An identifier of this event. This must be unique across requests to allow detection of
duplicates when requesting events via dates vs those streamed via live updates.
timestamp: The timestamp when the event occurred. This is always handled as UTC (Coordinated Universal Time).
mrid: The mRID of the cut defined by this event. This should match any future remove instructions.
acls_mrid: The mRID of the AC line segment that was cut.
"""

def __init__(self, event_id: str, timestamp: datetime, mrid: str, acls_mrid: str):
super().__init__(event_id, timestamp)
self.mrid = mrid
self.acls_mrid = acls_mrid

@staticmethod
def from_pb(event: PBCurrentStateEvent) -> 'AddCutEvent':
"""
Creates an `AddCutEvent` object from a protobuf `PBCurrentStateEvent`
"""
return AddCutEvent(
event.eventId,
event.timestamp.ToDatetime(),
event.addCut.mRID,
event.addCut.aclsMRID
)

def to_pb(self) -> PBCurrentStateEvent:
"""
Creates a protobuf `PBCurrentStateEvent` object with `addCut` from this `AddCutEvent`
"""
return PBCurrentStateEvent(
eventId=self.event_id,
timestamp=datetime_to_timestamp(self.timestamp),
addCut=PBAddCutEvent(mRID=self.mrid, aclsMRID=self.acls_mrid)
)


@dataclass
class RemoveCutEvent(CurrentStateEvent):
"""
An event to remove a cut from the network.

Attributes:
event_id: An identifier of this event. This must be unique across requests to allow detection of
duplicates when requesting events via dates vs those streamed via live updates.
timestamp: The timestamp when the event occurred. This is always handled as UTC (Coordinated Universal Time).
mrid: The mRID of the cut to remove. This should match a previously added cut.
"""

def __init__(self, event_id: str, timestamp: datetime, mrid: str):
super().__init__(event_id, timestamp)
self.mrid = mrid

@staticmethod
def from_pb(event: PBCurrentStateEvent) -> 'RemoveCutEvent':
"""
Creates a `RemoveCutEvent` object from protobuf `PBCurrentStateEvent`
"""
return RemoveCutEvent(
event.eventId,
event.timestamp.ToDatetime(),
event.removeCut.mRID
)

def to_pb(self) -> PBCurrentStateEvent:
"""
Creates a protobuf `PBCurrentStateEvent` object with `removeCut` from this `RemoveCutEvent`
"""
return PBCurrentStateEvent(
eventId=self.event_id,
timestamp=datetime_to_timestamp(self.timestamp),
removeCut=PBRemoveCutEvent(mRID=self.mrid)
)


@dataclass
class AddJumperEvent(CurrentStateEvent):
"""
An event to add a jumper to the network.

Attributes:
event_id: An identifier of this event. This must be unique across requests to allow detection of
duplicates when requesting events via dates vs those streamed via live updates.
timestamp: The timestamp when the event occurred. This is always handled as UTC (Coordinated Universal Time).
mrid: The mRID of the jumper affected by this event.
from_connection: Information on how this jumper is connected at one end of the jumper.
to_connection: Information on how this jumper is connected at the other end of the jumper.
"""

def __init__(self, event_id: str, timestamp: datetime, mrid: str, from_connection: 'JumperConnection', to_connection: 'JumperConnection'):
super().__init__(event_id, timestamp)
self.mrid = mrid
self.from_connection = from_connection
self.to_connection = to_connection

@staticmethod
def from_pb(event: PBCurrentStateEvent) -> 'AddJumperEvent':
"""
Creates an `AddJumperEvent` object from protobuf `PBCurrentStateEvent`
"""
return AddJumperEvent(
event.eventId,
event.timestamp.ToDatetime(),
event.addJumper.mRID,
JumperConnection.from_pb(event.addJumper.fromConnection),
JumperConnection.from_pb(event.addJumper.toConnection)
)

def to_pb(self) -> PBCurrentStateEvent:
"""
Creates a protobuf `PBCurrentStateEvent` object with `addJumper` from this `AddJumperEvent`
"""
return PBCurrentStateEvent(
eventId=self.event_id,
timestamp=datetime_to_timestamp(self.timestamp),
addJumper=PBAddJumperEvent(mRID=self.mrid, fromConnection=self.from_connection.to_pb(), toConnection=self.to_connection.to_pb())
)


@dataclass
class RemoveJumperEvent(CurrentStateEvent):
"""
An event to remove a jumper from the network.

Attributes:
event_id: An identifier of this event. This must be unique across requests to allow detection of
duplicates when requesting events via dates vs those streamed via live updates.
timestamp: The timestamp when the event occurred. This is always handled as UTC (Coordinated Universal Time).
mrid: The mRID of the jumper to remove. This should match a previously added jumper.
"""

def __init__(self, event_id: str, timestamp: datetime, mrid: str):
super().__init__(event_id, timestamp)
self.mrid = mrid

@staticmethod
def from_pb(event: PBCurrentStateEvent) -> 'RemoveJumperEvent':
"""
Creates a `RemoveJumperEvent` object from protobuf `PBCurrentStateEvent`
"""
return RemoveJumperEvent(
event.eventId,
event.timestamp.ToDatetime(),
event.removeJumper.mRID
)

def to_pb(self) -> PBCurrentStateEvent:
"""
Creates a protobuf `PBCurrentStateEvent` object with `removeJumper` from this `RemoveJumperEvent`
"""
return PBCurrentStateEvent(
eventId=self.event_id,
timestamp=datetime_to_timestamp(self.timestamp),
removeJumper=PBRemoveJumperEvent(mRID=self.mrid)
)


class SwitchAction(Enum):
Expand All @@ -97,3 +271,32 @@ class SwitchAction(Enum):
UNKNOWN = 0 # The specified action was unknown, or was not set.
OPEN = 1 # A request to open a switch.
CLOSE = 2 # A request to close a switch.


class JumperConnection:
"""
Information about how a jumper is connected to the network.

Attributes:
connected_mrid: The mRID of the conducting equipment (or terminal) connected to this end of the jumper.
"""

def __init__(self, connected_mrid: str):
self.connected_mrid = connected_mrid

@staticmethod
def from_pb(connection: PBJumperConnection) -> 'JumperConnection':
"""
Creates a `JumperConnection` object from protobuf `PBJumperConnection`
"""
return JumperConnection(
connection.connectedMRID,
)

def to_pb(self) -> PBJumperConnection:
"""
Creates a protobuf `PBJumperConnection` object from this `JumperConnection`
"""
return PBJumperConnection(
connectedMRID=self.connected_mrid
)
10 changes: 5 additions & 5 deletions test/run_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from time import perf_counter, process_time
from typing import Callable

from zepben.evolve import connect, Feeder
from zepben.evolve import connect_insecure, Feeder
from zepben.evolve.streaming.get.network_consumer import SyncNetworkConsumerClient

rpc_port = 9001
Expand All @@ -26,7 +26,7 @@ def run_streaming():


def run_retrieve():
with connect(rpc_port=rpc_port) as channel:
with connect_insecure(rpc_port=rpc_port) as channel:
client = SyncNetworkConsumerClient(channel=channel)

client.retrieve_network().throw_on_error()
Expand All @@ -36,7 +36,7 @@ def run_retrieve():


def run_get_object():
with connect(rpc_port=rpc_port) as channel:
with connect_insecure(rpc_port=rpc_port) as channel:
client = SyncNetworkConsumerClient(channel=channel)

client.get_identified_object("21527151-6fce-423d-84e5-8254a00b05b1").throw_on_error()
Expand All @@ -46,7 +46,7 @@ def run_get_object():


def run_feeder():
with connect(rpc_port=rpc_port) as channel:
with connect_insecure(rpc_port=rpc_port) as channel:
client = SyncNetworkConsumerClient(channel=channel)

client.get_equipment_container("CTN005", Feeder).throw_on_error()
Expand All @@ -56,7 +56,7 @@ def run_feeder():


def run_network_hierarchy():
with connect(rpc_port=rpc_port) as channel:
with connect_insecure(rpc_port=rpc_port) as channel:
client = SyncNetworkConsumerClient(channel=channel)

network_hierarchy = client.get_network_hierarchy().throw_on_error().value
Expand Down
Loading