From ec41cd02a7392ff425fe7c9e4d5f0b004f52077e Mon Sep 17 00:00:00 2001 From: Darkdruce Date: Mon, 29 Jun 2026 12:08:36 +0000 Subject: [PATCH] feat: implement health monitor, schema versioning, access logs, and event reference guide Closes #311 - [Backend] Implement Notification Health Monitor Closes #309 - [Contracts] Add Notification Version Tracking Closes #312 - [Contracts] Implement Notification Access Logs Closes #274 - [Docs] Create Smart Contract Event Reference Guide --- CONTRACT_EVENT_REFERENCE.md | 616 ++++++++++++++++++ README.md | 2 + .../hello-world/src/autoshare_logic.rs | 96 +++ .../contracts/hello-world/src/base/events.rs | 46 ++ contract/contracts/hello-world/src/lib.rs | 36 + .../hello-world/src/tests/access_log_test.rs | 66 ++ .../src/tests/schema_version_test.rs | 90 +++ listener/src/api/events-server.ts | 17 + listener/src/index.ts | 9 + .../services/notification-health-monitor.ts | 206 ++++++ 10 files changed, 1184 insertions(+) create mode 100644 CONTRACT_EVENT_REFERENCE.md create mode 100644 contract/contracts/hello-world/src/tests/access_log_test.rs create mode 100644 contract/contracts/hello-world/src/tests/schema_version_test.rs create mode 100644 listener/src/services/notification-health-monitor.ts diff --git a/CONTRACT_EVENT_REFERENCE.md b/CONTRACT_EVENT_REFERENCE.md new file mode 100644 index 0000000..5f645d1 --- /dev/null +++ b/CONTRACT_EVENT_REFERENCE.md @@ -0,0 +1,616 @@ +# Smart Contract Event Reference Guide + +> **Audience**: contributors, integrators, and backend/listener developers. +> +> This guide documents every event emitted by the NotifyChain smart contracts, +> explains their parameters and data types, provides practical examples, and +> offers usage recommendations for indexers and listeners. + +--- + +## Table of Contents + +1. [Overview](#overview) +2. [How Events Are Structured](#how-events-are-structured) +3. [Shared Types](#shared-types) +4. [AutoShare Contract Events](#autoshare-contract-events) + - [AutoshareCreated](#autosharecreated) + - [AutoshareUpdated](#autoshareupdated) + - [GroupDeactivated](#groupdeactivated) + - [GroupActivated](#groupactivated) + - [ContractPaused](#contractpaused) + - [ContractUnpaused](#contractunpaused) + - [AdminTransferred](#admintransferred) + - [Withdrawal](#withdrawal) + - [AuthorizationFailure](#authorizationfailure) + - [CategoryRegistered](#categoryregistered) +5. [Notification Lifecycle Events](#notification-lifecycle-events) + - [NotificationScheduled](#notificationscheduled) + - [NotificationExpired](#notificationexpired) + - [ScheduledNotificationCancelled](#schedulednotificationcancelled) + - [NotificationRevoked](#notificationrevoked) + - [NotificationExtended](#notificationextended) + - [BatchNotificationsCreated](#batchnotificationscreated) + - [BatchProcessingCompleted](#batchprocessingcompleted) +6. [Audit Log Events](#audit-log-events) + - [AuditRecordAppended](#auditrecordappended) +7. [Access Log Events](#access-log-events) + - [NotificationAccessed](#notificationaccessed) +8. [Schema Version Events](#schema-version-events) + - [SchemaVersionSet](#schemaversionset) +9. [Reputation Events](#reputation-events) + - [ReputationUpdated](#reputationupdated) + - [ReputationTierChanged](#reputationtierchanged) + - [NotificationLimitsConfigured](#notificationlimitsconfigured) +10. [TaskBounty Contract Events](#taskbounty-contract-events) + - [TaskCreated](#taskcreated) + - [WorkSubmitted](#worksubmitted) + - [SubmissionApproved](#submissionapproved) + - [SubmissionRejected](#submissionrejected) + - [TaskCancelled](#taskcancelled) + - [DisputeRaised](#disputeraised) +11. [Indexer and Listener Recommendations](#indexer-and-listener-recommendations) + +--- + +## Overview + +NotifyChain contracts emit **typed events** for every meaningful state change. +Off-chain components (listener service, dashboard, third-party indexers) consume +these events to trigger notifications, build audit trails, and display real-time +activity. + +Events are published via the Stellar Soroban event system. Each event has: + +- **Topics** – indexed fields that can be matched without decoding the full payload. +- **Data** – the event body (non-indexed fields serialised as XDR). + +--- + +## How Events Are Structured + +Every AutoShare contract event follows a consistent topic layout: + +``` +Topic 0: event name (Symbol, added automatically by the Soroban runtime) +Topic 1+: primary business key(s) (e.g. creator Address, notification_id) +Topic N-1: NotificationCategory (u32 enum) +Topic N: NotificationPriority (u32 enum, on events that carry it) +Data: remaining fields +``` + +> **Backward compatibility note**: `NotificationCategory` and `NotificationPriority` +> were added as the last topics of every event. Existing listeners that only read +> the event name (topic 0) and the original topics are unaffected — the extra +> trailing topics are ignored by consumers that don't look for them. + +--- + +## Shared Types + +### `NotificationCategory` (u32) + +| Value | Variant | Description | +|-------|---------|-------------| +| `0` | `Group` | AutoShare group lifecycle events | +| `1` | `Admin` | Administrative/system actions | +| `2` | `Financial` | Fund movement (withdrawals) | +| `3` | `Notification` | Scheduled notification operations | + +### `NotificationPriority` (u32) + +| Value | Variant | Description | +|-------|---------|-------------| +| `0` | `Low` | Informational; no action required | +| `1` | `Medium` | Standard operational event | +| `2` | `High` | Review promptly | +| `3` | `Critical` | Security-relevant or funds-moving; immediate attention | + +### `AuditAction` (u32) + +| Value | Variant | Description | +|-------|---------|-------------| +| `0` | `Created` | Notification scheduled on-chain | +| `1` | `DeliveryAttempt` | Delivery attempt made | +| `2` | `DeliveryFailed` | Delivery attempt failed | +| `3` | `Acknowledged` | Recipient acknowledged | +| `4` | `Cancelled` | Cancelled before expiry | +| `5` | `Expired` | Expired naturally | + +--- + +## AutoShare Contract Events + +### AutoshareCreated + +Emitted when a new AutoShare group is created. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `creator` | `Address` | ✅ topic | Creator of the group | +| `category` | `NotificationCategory` | ✅ topic | Always `Group` (0) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Low` (0) | +| `id` | `BytesN<32>` | data | Unique group identifier | + +**Example** (decoded XDR topics/data): +```json +{ + "topics": ["AutoshareCreated", "GABC...creator", 0, 0], + "data": "aabb...groupId32bytes" +} +``` + +**Usage**: index by `creator` and `id` to build a group registry. + +--- + +### AutoshareUpdated + +Emitted when a group's member list is updated. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `updater` | `Address` | ✅ topic | Address that triggered the update | +| `category` | `NotificationCategory` | ✅ topic | Always `Group` (0) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Low` (0) | +| `id` | `BytesN<32>` | data | Group identifier | + +--- + +### GroupDeactivated + +Emitted when a group is deactivated by its creator. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `creator` | `Address` | ✅ topic | Group creator | +| `category` | `NotificationCategory` | ✅ topic | Always `Group` (0) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Low` (0) | +| `id` | `BytesN<32>` | data | Group identifier | + +--- + +### GroupActivated + +Emitted when a previously deactivated group is reactivated. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `creator` | `Address` | ✅ topic | Group creator | +| `category` | `NotificationCategory` | ✅ topic | Always `Group` (0) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Low` (0) | +| `id` | `BytesN<32>` | data | Group identifier | + +--- + +### ContractPaused + +Emitted when the contract is paused by the admin. All mutating calls are rejected while paused. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `admin` | `Address` | ✅ topic | Admin who paused the contract | +| `category` | `NotificationCategory` | ✅ topic | Always `Admin` (1) | +| `priority` | `NotificationPriority` | ✅ topic | Always `High` (2) | + +--- + +### ContractUnpaused + +Emitted when the contract is resumed. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `admin` | `Address` | ✅ topic | Admin who unpaused the contract | +| `category` | `NotificationCategory` | ✅ topic | Always `Admin` (1) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Medium` (1) | + +--- + +### AdminTransferred + +Emitted when admin rights are transferred to a new address. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `old_admin` | `Address` | ✅ topic | Previous admin | +| `category` | `NotificationCategory` | ✅ topic | Always `Admin` (1) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Critical` (3) | +| `new_admin` | `Address` | data | Incoming admin | + +**Usage**: trigger an immediate alert — admin transfers are security-critical. + +--- + +### Withdrawal + +Emitted when the admin withdraws collected usage fees. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `token` | `Address` | ✅ topic | Token contract address | +| `recipient` | `Address` | ✅ topic | Recipient of the funds | +| `category` | `NotificationCategory` | ✅ topic | Always `Financial` (2) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Critical` (3) | +| `amount` | `i128` | data | Amount transferred (in token smallest unit) | + +--- + +### AuthorizationFailure + +Emitted when the contract detects an unauthorized call attempt. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `caller` | `Address` | ✅ topic | Address that attempted the action | +| `category` | `NotificationCategory` | ✅ topic | Always `Admin` (1) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Critical` (3) | +| `action` | `String` | data | Name of the attempted action | + +--- + +### CategoryRegistered + +Emitted when a notification category is registered on-chain. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `admin` | `Address` | ✅ topic | Admin who registered the category | +| `category` | `NotificationCategory` | ✅ topic | Category being registered | +| `priority` | `NotificationPriority` | ✅ topic | Always `Low` (0) | + +--- + +## Notification Lifecycle Events + +### NotificationScheduled + +Emitted when a notification is scheduled on-chain with a bounded lifetime. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `creator` | `Address` | ✅ topic | Address that scheduled the notification | +| `category` | `NotificationCategory` | ✅ topic | Always `Notification` (3) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Low` (0) | +| `notification_id` | `BytesN<32>` | data | Unique notification identifier | + +**Example**: +```json +{ + "topics": ["NotificationScheduled", "GABC...creator", 3, 0], + "data": "aabb...notificationId32bytes" +} +``` + +**Usage**: record in your listener's store with `(notification_id, creator, scheduled_at)`. + +--- + +### NotificationExpired + +Emitted when a scheduled notification's lifetime elapses. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `notification_id` | `BytesN<32>` | ✅ topic | Notification that expired | +| `category` | `NotificationCategory` | ✅ topic | Always `Notification` (3) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Low` (0) | +| `expires_at` | `u64` | data | Ledger timestamp (seconds) when it expired | + +--- + +### ScheduledNotificationCancelled + +Emitted when a scheduled notification is cancelled before expiry. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `caller` | `Address` | ✅ topic | Who cancelled it | +| `category` | `NotificationCategory` | ✅ topic | Always `Notification` (3) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Medium` (1) | +| `notification_id` | `BytesN<32>` | data | Cancelled notification's identifier | + +--- + +### NotificationRevoked + +Emitted when a notification is revoked by an authorized sender. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `notification_id` | `BytesN<32>` | ✅ topic | Revoked notification | +| `revoked_by` | `Address` | ✅ topic | Who initiated the revocation | +| `category` | `NotificationCategory` | ✅ topic | Always `Notification` (3) | +| `priority` | `NotificationPriority` | ✅ topic | Always `High` (2) | +| `revoked_at` | `u64` | data | Ledger timestamp (seconds) of revocation | + +--- + +### NotificationExtended + +Emitted when a notification's expiry is extended. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `notification_id` | `BytesN<32>` | ✅ topic | Extended notification | +| `caller` | `Address` | ✅ topic | Who extended it | +| `category` | `NotificationCategory` | ✅ topic | Always `Notification` (3) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Low` (0) | +| `new_expires_at` | `u64` | data | New ledger timestamp (seconds) for expiry | + +--- + +### BatchNotificationsCreated + +Emitted after a batch of notifications is scheduled in one transaction. One +`NotificationScheduled` event is also emitted for each individual notification. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `creator` | `Address` | ✅ topic | Address that submitted the batch | +| `category` | `NotificationCategory` | ✅ topic | Always `Notification` (3) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Low` (0) | +| `count` | `u32` | data | Number of notifications in the batch | +| `ids` | `Vec>` | data | All notification identifiers | + +**Usage**: use `count` to verify you received exactly that many `NotificationScheduled` events in the same transaction. + +--- + +### BatchProcessingCompleted + +Emitted when an off-chain batch of notifications finishes processing. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `batch_id` | `BytesN<32>` | ✅ topic | Batch identifier | +| `processed_count` | `u32` | data | Number of notifications processed | + +--- + +## Audit Log Events + +### AuditRecordAppended + +Emitted whenever a new audit record is appended to the on-chain log. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `notification_id` | `BytesN<32>` | ✅ topic | Notification the record belongs to | +| `action` | `AuditAction` | ✅ topic | Lifecycle stage (see shared types) | +| `category` | `NotificationCategory` | ✅ topic | Always `Notification` (3) | +| `seq` | `u64` | data | Monotonically increasing sequence number | +| `actor` | `Address` | data | Who triggered the action | +| `timestamp` | `u64` | data | Ledger timestamp (seconds) | + +**Usage**: key off `(notification_id, action)` to reconstruct the full lifecycle +of a notification. The `seq` field provides a stable total order across all records. + +**Example**: +```json +{ + "topics": ["AuditRecordAppended", "aabb...notificationId", 1, 3], + "data": { "seq": 5, "actor": "GABC...", "timestamp": 1720000000 } +} +``` + +--- + +## Access Log Events + +### NotificationAccessed + +Emitted whenever a protected notification record is accessed (read). Enables +compliance and traceability for off-chain auditors. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `notification_id` | `BytesN<32>` | ✅ topic | Notification that was accessed | +| `accessor` | `Address` | ✅ topic | Address that accessed the record | +| `category` | `NotificationCategory` | ✅ topic | Always `Notification` (3) | +| `accessed_at` | `u64` | data | Ledger timestamp (seconds) of access | + +**Usage**: index by `notification_id` to build a per-notification access trail. +Access records are immutable — once emitted they cannot be modified. + +**Example**: +```json +{ + "topics": ["NotificationAccessed", "aabb...notificationId", "GABC...accessor", 3], + "data": { "accessed_at": 1720000100 } +} +``` + +--- + +## Schema Version Events + +### SchemaVersionSet + +Emitted when the on-chain notification schema version is set or upgraded by the admin. +Off-chain consumers should check this event to gate their parsing logic. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `admin` | `Address` | ✅ topic | Admin who set the version | +| `category` | `NotificationCategory` | ✅ topic | Always `Admin` (1) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Medium` (1) | +| `schema_version` | `u32` | data | New schema version number | +| `previous_version` | `u32` | data | Previous version (0 if first set) | + +**Usage**: listeners should reject event payloads whose `schema_version` is +outside their supported range. Use `is_version_supported()` on-chain or +compare against your listener's `SUPPORTED_SCHEMA_VERSIONS` constant. + +**Example**: +```json +{ + "topics": ["SchemaVersionSet", "GABC...admin", 1, 1], + "data": { "schema_version": 1, "previous_version": 0 } +} +``` + +--- + +## Reputation Events + +### ReputationUpdated + +Emitted when a sender's reputation score changes after a delivery outcome. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `sender` | `Address` | ✅ topic | Sender whose score was updated | +| `new_score` | `i64` | data | Updated reputation score | +| `successful_count` | `u32` | data | Cumulative successful deliveries | +| `failed_count` | `u32` | data | Cumulative failed deliveries | + +--- + +### ReputationTierChanged + +Emitted when a sender's reputation tier changes (e.g., Bronze → Silver). + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `sender` | `Address` | ✅ topic | Sender | +| `category` | `NotificationCategory` | ✅ topic | Always `Notification` (3) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Low` (0) | +| `old_tier` | `u32` | data | Previous tier (0=Unverified … 4=Platinum) | +| `new_tier` | `u32` | data | New tier | +| `reputation_score` | `i64` | data | Score at the time of tier change | + +--- + +### NotificationLimitsConfigured + +Emitted when the admin sets protocol-level notification limits. + +| Field | Type | Indexed | Description | +|-------|------|---------|-------------| +| `admin` | `Address` | ✅ topic | Admin who configured the limits | +| `category` | `NotificationCategory` | ✅ topic | Always `Admin` (1) | +| `priority` | `NotificationPriority` | ✅ topic | Always `Medium` (1) | +| `max_payload_size` | `u32` | data | Maximum notification payload in bytes | +| `max_expiration_seconds` | `u64` | data | Maximum TTL in seconds | +| `min_expiration_seconds` | `u64` | data | Minimum TTL in seconds | +| `max_batch_size` | `u32` | data | Maximum notifications per batch | + +--- + +## TaskBounty Contract Events + +The TaskBounty contract (`Documents/Task Bounty/`) uses a simpler event format +(two-element topic tuple + data tuple) without `NotificationCategory` topics. + +### TaskCreated + +Emitted when a new bounty task is created. + +| Topics | `(symbol "task", symbol "created")` | +|--------|--------------------------------------| +| Data | `(task_id: u64, poster: Address, title: String, reward: i128, deadline: u64)` | + +--- + +### WorkSubmitted + +Emitted when a contributor submits work for a task. + +| Topics | `(symbol "work", symbol "submit")` | +|--------|-------------------------------------| +| Data | `(task_id: u64, submission_id: u64, contributor: Address, work_url: String)` | + +--- + +### SubmissionApproved + +Emitted when a poster approves a submission and the reward is transferred. + +| Topics | `(symbol "sub", symbol "approved")` | +|--------|--------------------------------------| +| Data | `(task_id: u64, submission_id: u64, contributor: Address, reward: i128)` | + +--- + +### SubmissionRejected + +Emitted when a poster rejects a submission. + +| Topics | `(symbol "sub", symbol "rejected")` | +|--------|--------------------------------------| +| Data | `(task_id: u64, submission_id: u64, contributor: Address)` | + +--- + +### TaskCancelled + +Emitted when a task is cancelled by its poster. + +| Topics | `(symbol "task", symbol "cancel")` | +|--------|-------------------------------------| +| Data | `(task_id: u64, poster: Address)` | + +--- + +### DisputeRaised + +Emitted when a dispute is raised against a submission. + +| Topics | `(symbol "dispute", symbol "raised")` | +|--------|----------------------------------------| +| Data | `(task_id: u64, submission_id: u64, raiser: Address, reason: String)` | + +--- + +## Indexer and Listener Recommendations + +### Subscribing selectively + +Use the `category` topic (second-to-last for most events) to filter event streams +without decoding the full payload: + +``` +category == 2 → Financial events only (Withdrawal) +category == 3 → Critical == only (AdminTransferred, Withdrawal, AuthorizationFailure) +``` + +### Handling schema versions + +1. On startup, call `get_schema_version()` to read the current version. +2. Subscribe to `SchemaVersionSet` events and refresh your version gate whenever it fires. +3. Drop any event whose decoded `schema_version` field is outside your supported range and emit a warning log. + +### Building an audit trail + +- Subscribe to `AuditRecordAppended` events and persist each record keyed by `(notification_id, seq)`. +- The `seq` field is monotonically increasing across *all* notifications — use it for total ordering. +- Audit records are immutable on-chain; do not allow updates in your off-chain store. + +### Access log compliance + +- Subscribe to `NotificationAccessed` events and persist `(notification_id, accessor, accessed_at)`. +- These records are compliance artefacts — treat them as append-only. + +### Deduplication + +The listener service deduplicates events by `(contract_address, event_id)`. If +you build a custom indexer, apply the same fingerprint to avoid double-counting +`AuditRecordAppended` and `NotificationScheduled` events, which are high-volume. + +### Reconnect and replay + +The Stellar RPC `getEvents` endpoint supports cursor-based pagination. Store the +last processed `event_id` (or ledger cursor) and resume from that point after a +reconnect to avoid gaps in your event stream. + +### Priority-based alerting + +Map `NotificationPriority` to your alerting system: + +| Priority | Action | +|----------|--------| +| `Low` (0) | Log only | +| `Medium` (1) | Dashboard update | +| `High` (2) | Notify on-call (non-urgent) | +| `Critical` (3) | Page on-call immediately | diff --git a/README.md b/README.md index a25fc52..3f69331 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,8 @@ The project enables developers to build reactive decentralized applications with 10. [License](#license) > **Listener service docs**: [Notification Failure Recovery](NOTIFICATION_FAILURE_RECOVERY.md) — retry lifecycle, configuration, and troubleshooting. +> +> **Event reference**: [Smart Contract Event Reference Guide](CONTRACT_EVENT_REFERENCE.md) — all emitted events, parameters, data types, and usage recommendations for indexers and listeners. --- diff --git a/contract/contracts/hello-world/src/autoshare_logic.rs b/contract/contracts/hello-world/src/autoshare_logic.rs index 20e4dd7..7b2a1a2 100644 --- a/contract/contracts/hello-world/src/autoshare_logic.rs +++ b/contract/contracts/hello-world/src/autoshare_logic.rs @@ -7,6 +7,7 @@ use crate::base::events::{ Withdrawal, BatchProcessingCompleted, NotificationExtended, NotificationLimitsConfigured, NotificationPriority, NotificationRevoked, NotificationScheduled, ScheduledNotificationCancelled, Withdrawal, + SchemaVersionSet, NotificationAccessed, }; use crate::base::types::{ AuditRecord, AutoShareDetails, GroupMember, NotificationLimits, PaymentHistory, @@ -53,6 +54,8 @@ pub enum DataKey { NotificationRevokers(BytesN<32>), NotificationLimits, RegisteredCategories, + /// Stores the current on-chain notification schema version. + SchemaVersion, } // ============================================================================ @@ -1623,3 +1626,96 @@ pub fn get_notification_limits(env: Env) -> NotificationLimits { max_batch_size: 1000, }) } + +// ============================================================================ +// Schema Version Tracking (Issue #309) +// ============================================================================ + +/// The minimum schema version this contract supports. +const MIN_SUPPORTED_SCHEMA_VERSION: u32 = 1; +/// The maximum (current) schema version this contract supports. +const MAX_SUPPORTED_SCHEMA_VERSION: u32 = 1; + +/// Sets the on-chain notification schema version. Only the admin can call this. +/// +/// Emits a [`SchemaVersionSet`] event so off-chain consumers can detect protocol +/// upgrades and reject payloads whose version they cannot handle. +/// +/// # Errors +/// - [`Error::Unauthorized`] – caller is not the admin. +/// - [`Error::InvalidInput`] – `schema_version` is outside the supported range. +pub fn set_schema_version(env: Env, admin: Address, schema_version: u32) -> Result<(), Error> { + admin.require_auth(); + + let stored_admin = get_admin(env.clone())?; + if admin != stored_admin { + return Err(Error::Unauthorized); + } + + if schema_version < MIN_SUPPORTED_SCHEMA_VERSION || schema_version > MAX_SUPPORTED_SCHEMA_VERSION { + return Err(Error::InvalidInput); + } + + let key = DataKey::SchemaVersion; + let previous_version: u32 = env + .storage() + .persistent() + .get::(&key) + .unwrap_or(0); + + env.storage().persistent().set(&key, &schema_version); + + SchemaVersionSet { + admin, + category: NotificationCategory::Admin, + priority: NotificationPriority::Medium, + schema_version, + previous_version, + } + .publish(&env); + + Ok(()) +} + +/// Returns the current on-chain schema version (0 if never set). +pub fn get_schema_version(env: Env) -> u32 { + env.storage() + .persistent() + .get::(&DataKey::SchemaVersion) + .unwrap_or(0) +} + +/// Returns whether `version` is within the supported range. +pub fn is_version_supported(_env: Env, version: u32) -> bool { + version >= MIN_SUPPORTED_SCHEMA_VERSION && version <= MAX_SUPPORTED_SCHEMA_VERSION +} + +// ============================================================================ +// Access Logging (Issue #312) +// ============================================================================ + +/// Emits a [`NotificationAccessed`] event for the given notification. +/// +/// Call this whenever a protected notification record is read so that +/// off-chain indexers can build an immutable access trail for compliance. +pub fn record_notification_access( + env: Env, + notification_id: BytesN<32>, + accessor: Address, +) -> Result<(), Error> { + // Verify the notification exists. + let key = DataKey::ScheduledNotification(notification_id.clone()); + if !env.storage().persistent().has(&key) { + return Err(Error::NotFound); + } + + NotificationAccessed { + notification_id, + accessor, + category: NotificationCategory::Notification, + accessed_at: env.ledger().timestamp(), + } + .publish(&env); + + Ok(()) +} diff --git a/contract/contracts/hello-world/src/base/events.rs b/contract/contracts/hello-world/src/base/events.rs index 56b079d..0ae1791 100644 --- a/contract/contracts/hello-world/src/base/events.rs +++ b/contract/contracts/hello-world/src/base/events.rs @@ -377,3 +377,49 @@ pub struct ReputationTierChanged { pub min_expiration_seconds: u64, pub max_batch_size: u32, } + +// ============================================================================ +// Schema Version Tracking (Issue #309) +// ============================================================================ + +/// Emitted when the on-chain notification schema version is set or upgraded. +/// +/// Off-chain consumers should read `schema_version` from every event to gate +/// their parsing logic. Unsupported versions must be rejected at the listener +/// layer so incompatible payloads never reach downstream consumers. +#[contractevent] +#[derive(Clone)] +pub struct SchemaVersionSet { + #[topic] + pub admin: Address, + #[topic] + pub category: NotificationCategory, + #[topic] + pub priority: NotificationPriority, + /// New schema version number. + pub schema_version: u32, + /// Previous schema version (0 when first set). + pub previous_version: u32, +} + +// ============================================================================ +// Access Logging (Issue #312) +// ============================================================================ + +/// Emitted whenever a protected notification record is accessed. +/// +/// Off-chain indexers should key off `(notification_id, accessor)` to build an +/// immutable access trail. The `accessed_at` timestamp is provided for ordering +/// and compliance reporting. +#[contractevent] +#[derive(Clone)] +pub struct NotificationAccessed { + #[topic] + pub notification_id: BytesN<32>, + #[topic] + pub accessor: Address, + #[topic] + pub category: NotificationCategory, + /// Ledger timestamp (seconds) when the access occurred. + pub accessed_at: u64, +} diff --git a/contract/contracts/hello-world/src/lib.rs b/contract/contracts/hello-world/src/lib.rs index 16c4509..a9f86e3 100644 --- a/contract/contracts/hello-world/src/lib.rs +++ b/contract/contracts/hello-world/src/lib.rs @@ -542,6 +542,36 @@ impl AutoShareContract { pub fn get_sender_reputation_tier(env: Env, sender: Address) -> u32 { reputation_logic::get_reputation_tier(&env, &sender).unwrap_or(0) } + + // ============================================================================ + // Schema Version Tracking (Issue #309) + // ============================================================================ + + /// Sets the on-chain notification schema version. Only the admin can call. + /// Emits a SchemaVersionSet event. Rejects versions outside the supported range. + pub fn set_schema_version(env: Env, admin: Address, schema_version: u32) { + autoshare_logic::set_schema_version(env, admin, schema_version).unwrap(); + } + + /// Returns the current on-chain schema version (0 if never set). + pub fn get_schema_version(env: Env) -> u32 { + autoshare_logic::get_schema_version(env) + } + + /// Returns true if the given schema version is within the supported range. + pub fn is_version_supported(env: Env, version: u32) -> bool { + autoshare_logic::is_version_supported(env, version) + } + + // ============================================================================ + // Access Logging (Issue #312) + // ============================================================================ + + /// Emits a NotificationAccessed event for the specified notification. + /// Call whenever a protected notification record is read to build an immutable access trail. + pub fn record_notification_access(env: Env, notification_id: BytesN<32>, accessor: Address) { + autoshare_logic::record_notification_access(env, notification_id, accessor).unwrap(); + } } #[cfg(test)] @@ -602,4 +632,10 @@ mod tests { #[path = "../tests/fuzz_test.rs"] mod fuzz_test; + + #[path = "../tests/schema_version_test.rs"] + mod schema_version_test; + + #[path = "../tests/access_log_test.rs"] + mod access_log_test; } diff --git a/contract/contracts/hello-world/src/tests/access_log_test.rs b/contract/contracts/hello-world/src/tests/access_log_test.rs new file mode 100644 index 0000000..93a13bc --- /dev/null +++ b/contract/contracts/hello-world/src/tests/access_log_test.rs @@ -0,0 +1,66 @@ +use crate::{AutoShareContract, AutoShareContractClient}; +use soroban_sdk::{testutils::Address as _, Address, BytesN, Env, String}; + +fn setup(env: &Env) -> (Address, AutoShareContractClient) { + let id = env.register(AutoShareContract, ()); + let client = AutoShareContractClient::new(env, &id); + let admin = Address::generate(env); + env.mock_all_auths(); + client.initialize_admin(&admin); + (admin, client) +} + +fn schedule_test_notification( + client: &AutoShareContractClient, + env: &Env, + creator: &Address, +) -> BytesN<32> { + let mut id_bytes = [0u8; 32]; + id_bytes[0] = 42; + let notification_id = BytesN::from_array(env, &id_bytes); + client.schedule_notification(¬ification_id, creator, &3600u64, &String::from_str(env, "Test")); + notification_id +} + +#[test] +fn test_access_event_emitted_for_existing_notification() { + let env = Env::default(); + env.mock_all_auths(); + let (admin, client) = setup(&env); + let notification_id = schedule_test_notification(&client, &env, &admin); + let accessor = Address::generate(&env); + + // Should not panic — notification exists. + client.record_notification_access(¬ification_id, &accessor); +} + +#[test] +#[should_panic] +fn test_access_event_fails_for_nonexistent_notification() { + let env = Env::default(); + env.mock_all_auths(); + let (_admin, client) = setup(&env); + + let mut id_bytes = [0u8; 32]; + id_bytes[0] = 99; + let notification_id = BytesN::from_array(&env, &id_bytes); + let accessor = Address::generate(&env); + + // Should panic — notification does not exist. + client.record_notification_access(¬ification_id, &accessor); +} + +#[test] +fn test_multiple_access_events_can_be_emitted() { + let env = Env::default(); + env.mock_all_auths(); + let (admin, client) = setup(&env); + let notification_id = schedule_test_notification(&client, &env, &admin); + + let accessor1 = Address::generate(&env); + let accessor2 = Address::generate(&env); + + client.record_notification_access(¬ification_id, &accessor1); + client.record_notification_access(¬ification_id, &accessor2); + // Both succeed — audit trail is append-only. +} diff --git a/contract/contracts/hello-world/src/tests/schema_version_test.rs b/contract/contracts/hello-world/src/tests/schema_version_test.rs new file mode 100644 index 0000000..994d7ba --- /dev/null +++ b/contract/contracts/hello-world/src/tests/schema_version_test.rs @@ -0,0 +1,90 @@ +use crate::{AutoShareContract, AutoShareContractClient}; +use soroban_sdk::{testutils::Address as _, Address, Env}; + +fn setup(env: &Env) -> (Address, AutoShareContractClient) { + let id = env.register(AutoShareContract, ()); + let client = AutoShareContractClient::new(env, &id); + let admin = Address::generate(env); + env.mock_all_auths(); + client.initialize_admin(&admin); + (admin, client) +} + +#[test] +fn test_schema_version_default_is_zero() { + let env = Env::default(); + let (_admin, client) = setup(&env); + assert_eq!(client.get_schema_version(), 0); +} + +#[test] +fn test_set_schema_version_stores_and_emits() { + let env = Env::default(); + env.mock_all_auths(); + let (admin, client) = setup(&env); + + client.set_schema_version(&admin, &1); + assert_eq!(client.get_schema_version(), 1); +} + +#[test] +fn test_set_schema_version_previous_version_tracked() { + let env = Env::default(); + env.mock_all_auths(); + let (admin, client) = setup(&env); + + // Set once; previous is 0. + client.set_schema_version(&admin, &1); + // Set again to the same supported value. + client.set_schema_version(&admin, &1); + assert_eq!(client.get_schema_version(), 1); +} + +#[test] +fn test_is_version_supported_returns_true_for_valid() { + let env = Env::default(); + let (_admin, client) = setup(&env); + assert!(client.is_version_supported(&1)); +} + +#[test] +fn test_is_version_supported_returns_false_for_zero() { + let env = Env::default(); + let (_admin, client) = setup(&env); + assert!(!client.is_version_supported(&0)); +} + +#[test] +fn test_is_version_supported_returns_false_for_future() { + let env = Env::default(); + let (_admin, client) = setup(&env); + assert!(!client.is_version_supported(&999)); +} + +#[test] +#[should_panic] +fn test_set_schema_version_rejects_unsupported_version() { + let env = Env::default(); + env.mock_all_auths(); + let (admin, client) = setup(&env); + client.set_schema_version(&admin, &999); +} + +#[test] +#[should_panic] +fn test_set_schema_version_rejects_zero() { + let env = Env::default(); + env.mock_all_auths(); + let (admin, client) = setup(&env); + client.set_schema_version(&admin, &0); +} + +#[test] +#[should_panic] +fn test_set_schema_version_requires_admin() { + let env = Env::default(); + env.mock_all_auths(); + let (_admin, client) = setup(&env); + let non_admin = Address::generate(&env); + client.set_schema_version(&non_admin, &1); +} diff --git a/listener/src/api/events-server.ts b/listener/src/api/events-server.ts index 13dfca1..fef115d 100644 --- a/listener/src/api/events-server.ts +++ b/listener/src/api/events-server.ts @@ -40,6 +40,7 @@ import { handleArchiveRequest } from './archive-api'; import { ArchiveStore } from '../services/archive-store'; import { ArchiveService } from '../services/archive-service'; import { NotificationMetricsStore } from '../services/notification-metrics-store'; +import { NotificationHealthMonitor } from '../services/notification-health-monitor'; export interface EventsServerOptions { port: number; @@ -66,6 +67,8 @@ export interface EventsServerOptions { metricsStore?: NotificationMetricsStore | null; /** Maximum age of signed requests in seconds (default: 300 = 5 minutes). */ signatureExpirationSeconds?: number; + /** Optional health monitor — exposes its last report at GET /api/notifications/health. */ + healthMonitor?: NotificationHealthMonitor | null; } type ServiceStatus = 'ok' | 'error' | 'not_configured'; @@ -465,6 +468,20 @@ export function createEventsServer(options: EventsServerOptions): http.Server { return; } + // GET /api/notifications/health + if (req.method === 'GET' && url.pathname === '/api/notifications/health') { + const report = options.healthMonitor?.getLastReport() ?? null; + if (!report) { + res.writeHead(503, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Health monitor not configured or no report yet' })); + return; + } + const httpStatus = report.status === 'unhealthy' ? 503 : report.status === 'degraded' ? 200 : 200; + res.writeHead(httpStatus, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(report)); + return; + } + // GET /api/rate-limit/metrics if (req.method === 'GET' && url.pathname === '/api/rate-limit/metrics') { if (!rateLimiter) { diff --git a/listener/src/index.ts b/listener/src/index.ts index 1c71017..97bbce1 100644 --- a/listener/src/index.ts +++ b/listener/src/index.ts @@ -21,6 +21,8 @@ import { NotificationMetricsRunner } from './services/notification-metrics-runne import { eventRegistry } from './store/event-registry'; import logger from './utils/logger'; import { loadConfig, ConfigError } from './config'; +import { NotificationHealthMonitor } from './services/notification-health-monitor'; +import { getWorkerManager } from './services/worker-manager'; dotenv.config(); @@ -38,6 +40,8 @@ async function main() { let metricsRunner: NotificationMetricsRunner | null = null; let metricsStore: NotificationMetricsStore | null = null; + const healthMonitor = new NotificationHealthMonitor(null, getWorkerManager()); + if (config.analytics?.enabled) { initNotificationAnalyticsAggregator(config.analytics); } @@ -118,14 +122,19 @@ async function main() { archiveStore, archiveService, metricsStore, + healthMonitor, }); + healthMonitor.start(); + const subscriber = new EventSubscriber(config); await subscriber.start(); const shutdown = async () => { logger.info('Shutting down services...'); + healthMonitor.stop(); + if (cleanupService) { await cleanupService.stop(); } diff --git a/listener/src/services/notification-health-monitor.ts b/listener/src/services/notification-health-monitor.ts new file mode 100644 index 0000000..47a1159 --- /dev/null +++ b/listener/src/services/notification-health-monitor.ts @@ -0,0 +1,206 @@ +import logger from '../utils/logger'; +import { EventProcessingQueue } from './event-processing-queue'; +import { WorkerManager } from './worker-manager'; +import { eventRegistry } from '../store/event-registry'; + +export type ComponentStatus = 'healthy' | 'degraded' | 'unhealthy'; + +export interface QueueHealth { + status: ComponentStatus; + pendingJobs: number; + stalledSince: number | null; +} + +export interface WorkerHealth { + status: ComponentStatus; + activeWorkers: number; + isShuttingDown: boolean; +} + +export interface RegistryHealth { + status: ComponentStatus; + eventCount: number; + lastIngestedAt: string | null; + processingDelayMs: number | null; +} + +export interface HealthReport { + status: ComponentStatus; + timestamp: string; + queue: QueueHealth; + workers: WorkerHealth; + registry: RegistryHealth; +} + +export interface NotificationHealthMonitorOptions { + /** How often to run a health check cycle in ms (default: 30_000). */ + intervalMs?: number; + /** Number of consecutive poll cycles with queue depth unchanged before marking stalled (default: 3). */ + stallThresholdCycles?: number; + /** Max processing delay before registry is considered degraded in ms (default: 60_000). */ + maxProcessingDelayMs?: number; + /** Injected clock for tests. */ + now?: () => number; +} + +/** + * Continuously monitors the health of notification processing components: + * queue depth, worker availability, stalled-job detection, and event registry lag. + * + * Call `start()` once and consume reports via `getLastReport()` or the + * `'report'` event. Call `stop()` for graceful shutdown. + */ +export class NotificationHealthMonitor { + private readonly intervalMs: number; + private readonly stallThresholdCycles: number; + private readonly maxProcessingDelayMs: number; + private readonly now: () => number; + + private queue: EventProcessingQueue | null; + private workerManager: WorkerManager | null; + + private timer: ReturnType | null = null; + private lastReport: HealthReport | null = null; + + // Stall detection: track last observed queue depth and how many cycles it hasn't changed. + private lastQueueDepth = -1; + private stalledCycles = 0; + private stalledSince: number | null = null; + + constructor( + queue: EventProcessingQueue | null, + workerManager: WorkerManager | null, + options: NotificationHealthMonitorOptions = {}, + ) { + this.queue = queue; + this.workerManager = workerManager; + this.intervalMs = options.intervalMs ?? 30_000; + this.stallThresholdCycles = options.stallThresholdCycles ?? 3; + this.maxProcessingDelayMs = options.maxProcessingDelayMs ?? 60_000; + this.now = options.now ?? Date.now; + } + + start(): void { + if (this.timer !== null) return; + this.timer = setInterval(() => { + this.runCheck(); + }, this.intervalMs); + // Run immediately so first report is available without waiting one interval. + this.runCheck(); + logger.info('NotificationHealthMonitor started', { intervalMs: this.intervalMs }); + } + + stop(): void { + if (this.timer !== null) { + clearInterval(this.timer); + this.timer = null; + } + logger.info('NotificationHealthMonitor stopped'); + } + + getLastReport(): HealthReport | null { + return this.lastReport; + } + + // --------------------------------------------------------------------------- + // Private helpers + // --------------------------------------------------------------------------- + + private runCheck(): void { + const queueHealth = this.checkQueue(); + const workerHealth = this.checkWorkers(); + const registryHealth = this.checkRegistry(); + + const overallStatus = this.deriveOverallStatus( + queueHealth.status, + workerHealth.status, + registryHealth.status, + ); + + const report: HealthReport = { + status: overallStatus, + timestamp: new Date(this.now()).toISOString(), + queue: queueHealth, + workers: workerHealth, + registry: registryHealth, + }; + + this.lastReport = report; + + const logFn = + overallStatus === 'healthy' + ? logger.debug.bind(logger) + : overallStatus === 'degraded' + ? logger.warn.bind(logger) + : logger.error.bind(logger); + + logFn('Health report generated', { status: overallStatus, report }); + } + + private checkQueue(): QueueHealth { + if (!this.queue) { + return { status: 'healthy', pendingJobs: 0, stalledSince: null }; + } + + const pending = this.queue.pendingCount(); + + if (pending > 0 && pending === this.lastQueueDepth) { + this.stalledCycles++; + if (this.stalledCycles >= this.stallThresholdCycles && this.stalledSince === null) { + this.stalledSince = this.now(); + logger.warn('Event processing queue appears stalled', { + pendingJobs: pending, + stalledCycles: this.stalledCycles, + }); + } + } else { + this.stalledCycles = 0; + this.stalledSince = null; + } + + this.lastQueueDepth = pending; + + let status: ComponentStatus = 'healthy'; + if (this.stalledSince !== null) { + status = 'unhealthy'; + } else if (pending > 0) { + status = 'degraded'; + } + + return { status, pendingJobs: pending, stalledSince: this.stalledSince }; + } + + private checkWorkers(): WorkerHealth { + if (!this.workerManager) { + return { status: 'healthy', activeWorkers: 0, isShuttingDown: false }; + } + + const activeWorkers = this.workerManager.getActiveJobCount(); + const isShuttingDown = this.workerManager.isShutdownInProgress(); + + const status: ComponentStatus = isShuttingDown ? 'degraded' : 'healthy'; + + return { status, activeWorkers, isShuttingDown }; + } + + private checkRegistry(): RegistryHealth { + const eventCount = eventRegistry.count(); + const { lastIngestedAt: lastIngestedMs } = eventRegistry.getIngestionSnapshot(); + const lastIngestedAt = lastIngestedMs !== null ? new Date(lastIngestedMs).toISOString() : null; + const processingDelayMs = + lastIngestedMs !== null ? this.now() - lastIngestedMs : null; + + let status: ComponentStatus = 'healthy'; + if (processingDelayMs !== null && processingDelayMs > this.maxProcessingDelayMs) { + status = 'degraded'; + } + + return { status, eventCount, lastIngestedAt, processingDelayMs }; + } + + private deriveOverallStatus(...statuses: ComponentStatus[]): ComponentStatus { + if (statuses.includes('unhealthy')) return 'unhealthy'; + if (statuses.includes('degraded')) return 'degraded'; + return 'healthy'; + } +}