From 7b2f2c12e0df4393c81186225e3630a8c44ff471 Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Wed, 15 Apr 2026 15:55:51 +0200 Subject: [PATCH] test(networking): add gossipsub handler behavioral test vectors Introduces a new GossipsubHandlerTest fixture that tests protocol DECISIONS, not wire encoding. Each vector defines an initial state (mesh topology, peers, caches, backoffs), an incoming RPC event, and captures the expected outbound RPCs plus resulting mesh state. This is the first behavioral test fixture for gossipsub. It complements the existing encoding vectors (RPC protobuf, message IDs, topic strings) by testing the logic layer: what does the node DO when it receives a GRAFT, PRUNE, IHAVE, IWANT, or published message? 14 test vectors covering: - GRAFT: accept, reject (capacity), reject (backoff), ignore (unsubscribed), idempotent - PRUNE: remove from mesh with backoff, zero backoff - IHAVE: unseen triggers IWANT, seen produces no IWANT, mixed filtering - IWANT: cached message returned - Message: forward to mesh except sender, duplicate not forwarded, IDONTWANT peer skipped Co-Authored-By: Claude Opus 4.6 (1M context) --- .../testing/src/consensus_testing/__init__.py | 4 + .../test_fixtures/__init__.py | 2 + .../test_fixtures/gossipsub_handler.py | 314 ++++++++++++++++++ .../networking/test_gossipsub_handlers.py | 293 ++++++++++++++++ 4 files changed, 613 insertions(+) create mode 100644 packages/testing/src/consensus_testing/test_fixtures/gossipsub_handler.py create mode 100644 tests/consensus/devnet/networking/test_gossipsub_handlers.py diff --git a/packages/testing/src/consensus_testing/__init__.py b/packages/testing/src/consensus_testing/__init__.py index 9f0157f9..077bc352 100644 --- a/packages/testing/src/consensus_testing/__init__.py +++ b/packages/testing/src/consensus_testing/__init__.py @@ -7,6 +7,7 @@ from .test_fixtures import ( BaseConsensusFixture, ForkChoiceTest, + GossipsubHandlerTest, NetworkingCodecTest, SSZTest, StateTransitionTest, @@ -34,6 +35,7 @@ VerifySignaturesTestFiller = Type[VerifySignaturesTest] SSZTestFiller = Type[SSZTest] NetworkingCodecTestFiller = Type[NetworkingCodecTest] +GossipsubHandlerTestFiller = Type[GossipsubHandlerTest] __all__ = [ # Public API @@ -51,6 +53,7 @@ "VerifySignaturesTest", "SSZTest", "NetworkingCodecTest", + "GossipsubHandlerTest", # Test types "BaseForkChoiceStep", "TickStep", @@ -68,4 +71,5 @@ "VerifySignaturesTestFiller", "SSZTestFiller", "NetworkingCodecTestFiller", + "GossipsubHandlerTestFiller", ] diff --git a/packages/testing/src/consensus_testing/test_fixtures/__init__.py b/packages/testing/src/consensus_testing/test_fixtures/__init__.py index a2424194..8ceb9b99 100644 --- a/packages/testing/src/consensus_testing/test_fixtures/__init__.py +++ b/packages/testing/src/consensus_testing/test_fixtures/__init__.py @@ -2,6 +2,7 @@ from .base import BaseConsensusFixture from .fork_choice import ForkChoiceTest +from .gossipsub_handler import GossipsubHandlerTest from .networking_codec import NetworkingCodecTest from .ssz import SSZTest from .state_transition import StateTransitionTest @@ -14,4 +15,5 @@ "VerifySignaturesTest", "SSZTest", "NetworkingCodecTest", + "GossipsubHandlerTest", ] diff --git a/packages/testing/src/consensus_testing/test_fixtures/gossipsub_handler.py b/packages/testing/src/consensus_testing/test_fixtures/gossipsub_handler.py new file mode 100644 index 00000000..aa8ea626 --- /dev/null +++ b/packages/testing/src/consensus_testing/test_fixtures/gossipsub_handler.py @@ -0,0 +1,314 @@ +""" +Gossipsub handler test fixture for protocol behavior conformance. + +Generates JSON test vectors that assert gossipsub protocol decisions. +Each vector captures an initial peer/mesh/cache state, an incoming RPC event, +and the expected outbound RPCs plus resulting mesh topology. + +The fixture tests protocol logic only, not wire encoding or I/O. +""" + +import asyncio +from dataclasses import dataclass, field +from typing import Any, ClassVar +from unittest.mock import patch + +from lean_spec.subspecs.networking import PeerId +from lean_spec.subspecs.networking.gossipsub.behavior import GossipsubBehavior, PeerState +from lean_spec.subspecs.networking.gossipsub.message import GossipsubMessage +from lean_spec.subspecs.networking.gossipsub.parameters import GossipsubParameters +from lean_spec.subspecs.networking.gossipsub.rpc import ( + RPC, + ControlGraft, + ControlIDontWant, + ControlIHave, + ControlIWant, + ControlMessage, + ControlPrune, + Message, +) +from lean_spec.subspecs.networking.gossipsub.types import MessageId, Timestamp, TopicId + +from .base import BaseConsensusFixture + +# Sentinel that satisfies `outbound_stream is not None` checks. +# The patched _send_rpc never touches the stream, so any non-None value works. +_FAKE_STREAM: Any = object() + + +@dataclass +class _SendCapture: + """Records each outbound RPC as a (peer, rpc) pair.""" + + sent: list[tuple[PeerId, RPC]] = field(default_factory=list) + + async def __call__(self, peer_id: PeerId, rpc: RPC) -> None: + self.sent.append((peer_id, rpc)) + + +def _peer_id(name: str) -> PeerId: + """Convert a short test name to a PeerId.""" + return PeerId.from_base58(name) + + +def _unhex(hex_str: str) -> bytes: + """Decode a 0x-prefixed hex string to bytes.""" + return bytes.fromhex(hex_str.removeprefix("0x")) + + +def _serialize_rpcs( + sent: list[tuple[PeerId, RPC]], peer_names: dict[PeerId, str] +) -> list[dict[str, Any]]: + """ + Convert captured outbound RPCs to JSON-friendly dicts. + + Each entry in the output list represents one RPC sent to a peer. + The structure mirrors the gossipsub RPC wire format: + + - toPeer: human-readable peer name + - subscriptions: topic subscribe/unsubscribe changes (if any) + - publish: forwarded messages with hex-encoded data (if any) + - control: GRAFT, PRUNE, IWANT, IDONTWANT sub-messages (if any) + + Fixture consumers use this to assert exact outbound behavior. + """ + result = [] + for pid, rpc in sent: + name = peer_names.get(pid, str(pid)) + entry: dict[str, Any] = {"toPeer": name} + + if rpc.subscriptions: + entry["subscriptions"] = [ + {"subscribe": s.subscribe, "topicId": str(s.topic_id)} for s in rpc.subscriptions + ] + + if rpc.publish: + entry["publish"] = [ + {"topic": str(m.topic), "data": "0x" + m.data.hex()} for m in rpc.publish + ] + + # Only include control fields that carry sub-messages. + if rpc.control and not rpc.control.is_empty(): + ctrl: dict[str, Any] = {} + if rpc.control.graft: + ctrl["graft"] = [{"topicId": str(g.topic_id)} for g in rpc.control.graft] + if rpc.control.prune: + ctrl["prune"] = [ + {"topicId": str(p.topic_id), "backoff": p.backoff} for p in rpc.control.prune + ] + if rpc.control.iwant: + ctrl["iwant"] = [ + {"messageIds": ["0x" + mid.hex() for mid in w.message_ids]} + for w in rpc.control.iwant + ] + if rpc.control.idontwant: + ctrl["idontwant"] = [ + {"messageIds": ["0x" + mid.hex() for mid in d.message_ids]} + for d in rpc.control.idontwant + ] + entry["control"] = ctrl + + result.append(entry) + return result + + +def _serialize_meshes( + behavior: GossipsubBehavior, peer_names: dict[PeerId, str] +) -> dict[str, list[str]]: + """ + Snapshot the mesh topology after handler execution. + + Returns a dict mapping each topic to a sorted list of peer names. + Sorting ensures deterministic output for fixture comparison. + """ + return { + str(topic): sorted(peer_names.get(p, str(p)) for p in behavior.mesh.get_mesh_peers(topic)) + for topic in behavior.mesh.subscriptions + } + + +class GossipsubHandlerTest(BaseConsensusFixture): + """Fixture for gossipsub handler behavior conformance. + + Tests protocol decisions: given initial state + incoming event, + what RPCs are sent and how does the mesh change? + + JSON output: params, initialState, event, now, expected. + """ + + format_name: ClassVar[str] = "gossipsub_handler" + description: ClassVar[str] = "Tests gossipsub handler protocol decisions" + + handler_name: str + """Handler being tested: graft, prune, ihave, iwant, message.""" + + params: dict[str, int] + """Gossipsub parameters: d, dLow, dHigh, dLazy.""" + + initial_state: dict[str, Any] + """Initial behavior state: subscriptions, meshes, peers, caches.""" + + event: dict[str, Any] + """Incoming event: fromPeer + RPC content.""" + + now: float = 1000.0 + """Current timestamp for backoff checks.""" + + expected: dict[str, Any] = {} + """Expected output. Filled by make_fixture.""" + + def make_fixture(self) -> "GossipsubHandlerTest": + """Produce the completed fixture with expected outputs filled in.""" + expected = asyncio.run(self._execute()) + return self.model_copy(update={"expected": expected}) + + async def _execute(self) -> dict[str, Any]: + """ + Run the handler against a fully-configured behavior instance. + + Builds the gossipsub behavior from fixture inputs, dispatches the + incoming RPC, and returns the outbound RPCs and final mesh state. + """ + gs_params = GossipsubParameters( + d=self.params.get("d", 8), + d_low=self.params.get("dLow", 6), + d_high=self.params.get("dHigh", 12), + d_lazy=self.params.get("dLazy", 6), + ) + behavior = GossipsubBehavior(params=gs_params) + + # Intercept outbound RPCs instead of sending them over the network. + capture = _SendCapture() + behavior._send_rpc = capture # type: ignore[assignment] + + # Map between human-readable test names and opaque peer identifiers. + peer_names: dict[PeerId, str] = {} + + # Subscriptions define which topics the local node participates in. + # + # Handlers ignore messages for topics we are not subscribed to. + for topic in self.initial_state.get("subscriptions", []): + behavior.mesh.subscribe(TopicId(topic)) + + # Register each peer with its subscriptions and protocol state. + # + # Peer properties like backoff timers and IDONTWANT sets directly + # influence handler decisions (e.g., reject GRAFTs, skip forwarding). + for name, info in self.initial_state.get("peers", {}).items(): + pid = _peer_id(name) + peer_names[pid] = name + state = PeerState( + peer_id=pid, + subscriptions={TopicId(t) for t in info.get("subscriptions", [])}, + outbound_stream=_FAKE_STREAM if info.get("withStream", True) else None, + ) + + # Backoff prevents re-GRAFTing a recently-pruned peer. + for topic_str, expiry in info.get("backoff", {}).items(): + state.backoff[TopicId(topic_str)] = expiry + + # IDONTWANT suppresses forwarding to peers that already have the message. + for mid_hex in info.get("dontWantIds", []): + state.dont_want_ids.add(MessageId(_unhex(mid_hex))) + behavior._peers[pid] = state + + # Mesh topology determines who receives forwarded messages. + # + # Handlers check mesh membership for GRAFT acceptance, PRUNE removal, + # and message forwarding decisions. + for topic_str, peer_list in self.initial_state.get("meshes", {}).items(): + topic = TopicId(topic_str) + for name in peer_list: + behavior.mesh.add_to_mesh(topic, _peer_id(name)) + + # Seen cache tracks previously-received message IDs. + # + # Duplicate messages are silently dropped; IHAVE for seen IDs + # does not trigger an IWANT response. + for mid_hex in self.initial_state.get("seenMessageIds", []): + behavior.seen_cache.add(MessageId(_unhex(mid_hex)), Timestamp(self.now)) + + # Message cache holds full message payloads for IWANT responses. + # + # When a peer requests a message via IWANT, the handler looks it up + # here and sends the payload back. + for entry in self.initial_state.get("cachedMessages", []): + msg = GossipsubMessage( + topic=entry["topic"].encode("utf-8"), + raw_data=_unhex(entry["data"]), + ) + msg._cached_id = MessageId(_unhex(entry["messageId"])) + behavior.message_cache.put(TopicId(entry["topic"]), msg) + + # Build the incoming RPC from the event. + from_peer = _peer_id(self.event["fromPeer"]) + peer_names.setdefault(from_peer, self.event["fromPeer"]) + + # Fix the clock so backoff and TTL checks are deterministic. + with patch("time.time", return_value=self.now): + await behavior._handle_rpc(from_peer, _build_event_rpc(self.event)) + + return { + "sentRpcs": _serialize_rpcs(capture.sent, peer_names), + "meshAfter": _serialize_meshes(behavior, peer_names), + } + + +def _build_event_rpc(event: dict[str, Any]) -> RPC: + """ + Construct an RPC from the event dict supplied by the test fixture. + + The event dict describes one incoming message from a peer. + Required key: + + - fromPeer: short name of the sending peer + + Optional keys (include one or more to build the RPC): + + - graft: list of dicts with topicId + - prune: list of dicts with topicId and optional backoff + - ihave: list of dicts with topicId and hex messageIds + - iwant: list of dicts with hex messageIds + - idontwant: list of dicts with hex messageIds + - publish: list of dicts with topic and hex data + """ + control_parts: dict[str, list[Any]] = {} + + if "graft" in event: + control_parts["graft"] = [ + ControlGraft(topic_id=TopicId(g["topicId"])) for g in event["graft"] + ] + if "prune" in event: + control_parts["prune"] = [ + ControlPrune(topic_id=TopicId(p["topicId"]), backoff=p.get("backoff", 0)) + for p in event["prune"] + ] + if "ihave" in event: + control_parts["ihave"] = [ + ControlIHave( + topic_id=TopicId(ih["topicId"]), + message_ids=[_unhex(m) for m in ih.get("messageIds", [])], + ) + for ih in event["ihave"] + ] + if "iwant" in event: + control_parts["iwant"] = [ + ControlIWant(message_ids=[_unhex(m) for m in iw.get("messageIds", [])]) + for iw in event["iwant"] + ] + if "idontwant" in event: + control_parts["idontwant"] = [ + ControlIDontWant(message_ids=[_unhex(m) for m in idw.get("messageIds", [])]) + for idw in event["idontwant"] + ] + + return RPC( + publish=[ + Message( + topic=TopicId(m.get("topic", "")), + data=_unhex(m["data"]) if m.get("data") else b"", + ) + for m in event.get("publish", []) + ], + control=ControlMessage(**control_parts) if control_parts else None, + ) diff --git a/tests/consensus/devnet/networking/test_gossipsub_handlers.py b/tests/consensus/devnet/networking/test_gossipsub_handlers.py new file mode 100644 index 00000000..8155e49b --- /dev/null +++ b/tests/consensus/devnet/networking/test_gossipsub_handlers.py @@ -0,0 +1,293 @@ +"""Test vectors for gossipsub handler protocol decisions.""" + +import pytest +from consensus_testing import GossipsubHandlerTestFiller + +pytestmark = pytest.mark.valid_until("Devnet") + +TOPIC = "test_topic" +PARAMS = {"d": 4, "dLow": 3, "dHigh": 6, "dLazy": 3} +MSG_ID = "0x8dc6bba09a9550cdccb1b1b432bb04919901ce1e" +"""Message ID for topic=b'test_topic', data=0xdeadbeef, invalid-snappy domain.""" + +MSG_ID_2 = "0xb51451075fcc3e8f0baa9041d8256c647ceaeaac" +"""Message ID for topic=b'test_topic', data=0xcafebabe, invalid-snappy domain.""" + + +# --- GRAFT handler --- + + +def test_graft_accept(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """Accept GRAFT when subscribed and mesh has capacity.""" + gossipsub_handler( + handler_name="graft", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {TOPIC: ["peerAx"]}, + "peers": { + "peerAx": {"subscriptions": [TOPIC]}, + "peerBx": {"subscriptions": [TOPIC]}, + }, + }, + event={"fromPeer": "peerBx", "graft": [{"topicId": TOPIC}]}, + ) + + +def test_graft_reject_capacity(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """Reject GRAFT with PRUNE when mesh is at d_high.""" + gossipsub_handler( + handler_name="graft", + params={"d": 4, "dLow": 3, "dHigh": 3, "dLazy": 3}, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {TOPIC: ["peerAx", "peerBx", "peerCx"]}, + "peers": { + "peerAx": {"subscriptions": [TOPIC]}, + "peerBx": {"subscriptions": [TOPIC]}, + "peerCx": {"subscriptions": [TOPIC]}, + "peerDx": {"subscriptions": [TOPIC]}, + }, + }, + event={"fromPeer": "peerDx", "graft": [{"topicId": TOPIC}]}, + ) + + +def test_graft_reject_backoff(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """Reject GRAFT with PRUNE when peer is in backoff period.""" + gossipsub_handler( + handler_name="graft", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {TOPIC: ["peerAx"]}, + "peers": { + "peerAx": {"subscriptions": [TOPIC]}, + "peerBx": {"subscriptions": [TOPIC], "backoff": {TOPIC: 1060.0}}, + }, + }, + event={"fromPeer": "peerBx", "graft": [{"topicId": TOPIC}]}, + now=1000.0, + ) + + +def test_graft_ignore_unsubscribed(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """Silently ignore GRAFT for a topic we are not subscribed to.""" + gossipsub_handler( + handler_name="graft", + params=PARAMS, + initial_state={ + "subscriptions": [], + "meshes": {}, + "peers": { + "peerAx": {"subscriptions": [TOPIC]}, + }, + }, + event={"fromPeer": "peerAx", "graft": [{"topicId": TOPIC}]}, + ) + + +def test_graft_idempotent(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """GRAFT for a peer already in mesh is harmless.""" + gossipsub_handler( + handler_name="graft", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {TOPIC: ["peerAx"]}, + "peers": { + "peerAx": {"subscriptions": [TOPIC]}, + }, + }, + event={"fromPeer": "peerAx", "graft": [{"topicId": TOPIC}]}, + ) + + +# --- PRUNE handler --- + + +def test_prune_with_backoff(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """PRUNE removes peer from mesh and sets backoff timer.""" + gossipsub_handler( + handler_name="prune", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {TOPIC: ["peerAx", "peerBx"]}, + "peers": { + "peerAx": {"subscriptions": [TOPIC]}, + "peerBx": {"subscriptions": [TOPIC]}, + }, + }, + event={"fromPeer": "peerAx", "prune": [{"topicId": TOPIC, "backoff": 60}]}, + ) + + +def test_prune_zero_backoff(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """PRUNE with backoff=0 removes from mesh but does not set backoff timer.""" + gossipsub_handler( + handler_name="prune", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {TOPIC: ["peerAx"]}, + "peers": { + "peerAx": {"subscriptions": [TOPIC]}, + }, + }, + event={"fromPeer": "peerAx", "prune": [{"topicId": TOPIC, "backoff": 0}]}, + ) + + +# --- IHAVE handler --- + + +def test_ihave_unseen_triggers_iwant(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """IHAVE for an unseen message triggers an IWANT response.""" + gossipsub_handler( + handler_name="ihave", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {}, + "peers": {"peerAx": {"subscriptions": [TOPIC]}}, + "seenMessageIds": [], + "cachedMessages": [], + }, + event={ + "fromPeer": "peerAx", + "ihave": [{"topicId": TOPIC, "messageIds": [MSG_ID]}], + }, + ) + + +def test_ihave_seen_no_iwant(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """IHAVE for an already-seen message produces no IWANT.""" + gossipsub_handler( + handler_name="ihave", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {}, + "peers": {"peerAx": {"subscriptions": [TOPIC]}}, + "seenMessageIds": [MSG_ID], + "cachedMessages": [], + }, + event={ + "fromPeer": "peerAx", + "ihave": [{"topicId": TOPIC, "messageIds": [MSG_ID]}], + }, + ) + + +def test_ihave_mixed(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """IHAVE with seen and unseen IDs. IWANT only for the unseen one.""" + gossipsub_handler( + handler_name="ihave", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {}, + "peers": {"peerAx": {"subscriptions": [TOPIC]}}, + "seenMessageIds": [MSG_ID], + "cachedMessages": [], + }, + event={ + "fromPeer": "peerAx", + "ihave": [{"topicId": TOPIC, "messageIds": [MSG_ID, MSG_ID_2]}], + }, + ) + + +# --- IWANT handler --- + + +def test_iwant_cached_responds(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """IWANT for a cached message responds with the full message.""" + gossipsub_handler( + handler_name="iwant", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {}, + "peers": {"peerAx": {"subscriptions": [TOPIC]}}, + "seenMessageIds": [], + "cachedMessages": [ + {"topic": TOPIC, "data": "0xdeadbeef", "messageId": MSG_ID}, + ], + }, + event={ + "fromPeer": "peerAx", + "iwant": [{"messageIds": [MSG_ID]}], + }, + ) + + +# --- Message handler --- + + +def test_message_forward_to_mesh(gossipsub_handler: GossipsubHandlerTestFiller) -> None: + """New message forwarded to mesh peers except the sender.""" + gossipsub_handler( + handler_name="message", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {TOPIC: ["senderX", "peerAx", "peerBx"]}, + "peers": { + "senderX": {"subscriptions": [TOPIC]}, + "peerAx": {"subscriptions": [TOPIC]}, + "peerBx": {"subscriptions": [TOPIC]}, + }, + }, + event={ + "fromPeer": "senderX", + "publish": [{"topic": TOPIC, "data": "0xdeadbeef"}], + }, + ) + + +def test_message_duplicate_not_forwarded( + gossipsub_handler: GossipsubHandlerTestFiller, +) -> None: + """Duplicate message (already in seen cache) is not forwarded.""" + gossipsub_handler( + handler_name="message", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {TOPIC: ["senderX", "peerAx"]}, + "peers": { + "senderX": {"subscriptions": [TOPIC]}, + "peerAx": {"subscriptions": [TOPIC]}, + }, + "seenMessageIds": [MSG_ID], + }, + event={ + "fromPeer": "senderX", + "publish": [{"topic": TOPIC, "data": "0xdeadbeef"}], + }, + ) + + +def test_message_idontwant_skips_peer( + gossipsub_handler: GossipsubHandlerTestFiller, +) -> None: + """Peer that sent IDONTWANT for this message ID is skipped during forwarding.""" + gossipsub_handler( + handler_name="message", + params=PARAMS, + initial_state={ + "subscriptions": [TOPIC], + "meshes": {TOPIC: ["senderX", "peerAx", "peerBx"]}, + "peers": { + "senderX": {"subscriptions": [TOPIC]}, + "peerAx": {"subscriptions": [TOPIC], "dontWantIds": [MSG_ID]}, + "peerBx": {"subscriptions": [TOPIC]}, + }, + }, + event={ + "fromPeer": "senderX", + "publish": [{"topic": TOPIC, "data": "0xdeadbeef"}], + }, + )