From 8d888aebfa2ad5b604b2b45ee927e42fd0c10c15 Mon Sep 17 00:00:00 2001 From: Matthias Pfefferle Date: Thu, 19 Feb 2026 18:15:53 +0100 Subject: [PATCH] Add Server-Sent Events (SSE) for real-time collection streaming Implement the SWICG ActivityPub API SSE spec so C2S clients can subscribe to live updates on outbox and inbox collections instead of polling. - Add Event_Stream_Controller with SSE stream endpoint per collection - Add Event_Stream signal class using transients for efficient polling - Add access_token query parameter fallback for EventSource clients - Add eventStream property to outbox and inbox collection responses - Add proxyEventStream to actor endpoints (proxy returns 501 for now) --- activitypub.php | 4 +- includes/class-event-stream.php | 61 +++ includes/model/class-blog.php | 1 + includes/model/class-user.php | 1 + includes/oauth/class-server.php | 20 +- .../rest/class-actors-inbox-controller.php | 1 + .../rest/class-event-stream-controller.php | 491 ++++++++++++++++++ includes/rest/class-outbox-controller.php | 1 + 8 files changed, 573 insertions(+), 7 deletions(-) create mode 100644 includes/class-event-stream.php create mode 100644 includes/rest/class-event-stream-controller.php diff --git a/activitypub.php b/activitypub.php index bc1e220d8a..a71e5edc95 100644 --- a/activitypub.php +++ b/activitypub.php @@ -74,6 +74,7 @@ function rest_init() { // Load OAuth REST endpoints. ( new Rest\OAuth_Controller() )->register_routes(); ( new Rest\Proxy_Controller() )->register_routes(); + ( new Rest\Event_Stream_Controller() )->register_routes(); } \add_action( 'rest_api_init', __NAMESPACE__ . '\rest_init' ); @@ -87,6 +88,7 @@ function plugin_init() { \add_action( 'init', array( __NAMESPACE__ . '\Comment', 'init' ) ); \add_action( 'init', array( __NAMESPACE__ . '\Dispatcher', 'init' ) ); \add_action( 'init', array( __NAMESPACE__ . '\Embed', 'init' ) ); + \add_action( 'init', array( __NAMESPACE__ . '\Event_Stream', 'init' ) ); \add_action( 'init', array( __NAMESPACE__ . '\Handler', 'init' ) ); \add_action( 'init', array( __NAMESPACE__ . '\Hashtag', 'init' ) ); \add_action( 'init', array( __NAMESPACE__ . '\Link', 'init' ) ); @@ -94,6 +96,7 @@ function plugin_init() { \add_action( 'init', array( __NAMESPACE__ . '\Mention', 'init' ) ); \add_action( 'init', array( __NAMESPACE__ . '\Migration', 'init' ), 1 ); \add_action( 'init', array( __NAMESPACE__ . '\Move', 'init' ) ); + \add_action( 'init', array( __NAMESPACE__ . '\OAuth\Server', 'init' ) ); \add_action( 'init', array( __NAMESPACE__ . '\Options', 'init' ) ); \add_action( 'init', array( __NAMESPACE__ . '\Post_Types', 'init' ) ); \add_action( 'init', array( __NAMESPACE__ . '\Router', 'init' ) ); @@ -101,7 +104,6 @@ function plugin_init() { \add_action( 'init', array( __NAMESPACE__ . '\Scheduler', 'init' ), 0 ); \add_action( 'init', array( __NAMESPACE__ . '\Search', 'init' ) ); \add_action( 'init', array( __NAMESPACE__ . '\Signature', 'init' ) ); - \add_action( 'init', array( __NAMESPACE__ . '\OAuth\Server', 'init' ) ); if ( site_supports_blocks() ) { \add_action( 'init', array( __NAMESPACE__ . '\Blocks', 'init' ) ); diff --git a/includes/class-event-stream.php b/includes/class-event-stream.php new file mode 100644 index 0000000000..ae64636ead --- /dev/null +++ b/includes/class-event-stream.php @@ -0,0 +1,61 @@ + get_rest_url_by_path( 'oauth/authorize' ), 'oauthTokenEndpoint' => get_rest_url_by_path( 'oauth/token' ), 'proxyUrl' => get_rest_url_by_path( 'proxy' ), + 'proxyEventStream' => get_rest_url_by_path( 'proxy-event-stream' ), ); } diff --git a/includes/model/class-user.php b/includes/model/class-user.php index 8f85ea5532..d9b1a2d9a6 100644 --- a/includes/model/class-user.php +++ b/includes/model/class-user.php @@ -322,6 +322,7 @@ public function get_endpoints() { 'oauthAuthorizationEndpoint' => get_rest_url_by_path( 'oauth/authorize' ), 'oauthTokenEndpoint' => get_rest_url_by_path( 'oauth/token' ), 'proxyUrl' => get_rest_url_by_path( 'proxy' ), + 'proxyEventStream' => get_rest_url_by_path( 'proxy-event-stream' ), ); } diff --git a/includes/oauth/class-server.php b/includes/oauth/class-server.php index 6290d2fe6e..ea02ec5cad 100644 --- a/includes/oauth/class-server.php +++ b/includes/oauth/class-server.php @@ -121,16 +121,24 @@ public static function has_scope( $scope ) { public static function get_bearer_token() { $auth_header = self::get_authorization_header(); - if ( ! $auth_header ) { - return null; + if ( $auth_header && 0 === strpos( $auth_header, 'Bearer ' ) ) { + return substr( $auth_header, 7 ); } - // Check for Bearer token. - if ( 0 !== strpos( $auth_header, 'Bearer ' ) ) { - return null; + /* + * Fall back to `access_token` query parameter for EventSource clients. + * The browser EventSource API cannot send custom headers, so the SSE + * spec requires accepting the token as a query parameter. + * + * @see https://swicg.github.io/activitypub-api/sse + */ + // phpcs:disable WordPress.Security.NonceVerification.Recommended, WordPress.Security.ValidatedSanitizedInput.InputNotSanitized -- Opaque auth token, must not be altered. + if ( ! empty( $_GET['access_token'] ) ) { + return \wp_unslash( $_GET['access_token'] ); } + // phpcs:enable WordPress.Security.NonceVerification.Recommended, WordPress.Security.ValidatedSanitizedInput.InputNotSanitized - return substr( $auth_header, 7 ); + return null; } /** diff --git a/includes/rest/class-actors-inbox-controller.php b/includes/rest/class-actors-inbox-controller.php index 751b78b7cf..5714d66b8c 100644 --- a/includes/rest/class-actors-inbox-controller.php +++ b/includes/rest/class-actors-inbox-controller.php @@ -167,6 +167,7 @@ public function get_items( $request ) { 'actor' => $user->get_id(), 'type' => 'OrderedCollection', 'totalItems' => (int) $inbox_query->found_posts, + 'eventStream' => Event_Stream_Controller::get_stream_url( $user_id, 'inbox' ), 'orderedItems' => array(), ); diff --git a/includes/rest/class-event-stream-controller.php b/includes/rest/class-event-stream-controller.php new file mode 100644 index 0000000000..22076e81c1 --- /dev/null +++ b/includes/rest/class-event-stream-controller.php @@ -0,0 +1,491 @@ +[-]?\d+)/(?Poutbox|inbox)/stream'; + + /** + * SSE polling interval in seconds. + * + * @var int + */ + const POLL_INTERVAL = 5; + + /** + * Maximum SSE connection duration in seconds. + * + * @var int + */ + const MAX_DURATION = 300; + + /** + * Map of outbox activity types to SSE event types. + * + * @see https://swicg.github.io/activitypub-api/sse + * + * @var array + */ + const EVENT_TYPE_MAP = array( + 'Create' => 'Add', + 'Announce' => 'Add', + 'Like' => 'Add', + 'Update' => 'Update', + 'Delete' => 'Remove', + 'Undo' => 'Remove', + ); + + /** + * Register routes. + */ + public function register_routes() { + \register_rest_route( + $this->namespace, + '/' . $this->rest_base, + array( + 'args' => array( + 'user_id' => array( + 'description' => 'The ID of the user or actor.', + 'type' => 'integer', + 'validate_callback' => array( $this, 'validate_user_id' ), + ), + 'collection' => array( + 'description' => 'The collection to stream (outbox or inbox).', + 'type' => 'string', + 'enum' => array( 'outbox', 'inbox' ), + ), + ), + array( + 'methods' => \WP_REST_Server::READABLE, + 'callback' => array( $this, 'get_items' ), + 'permission_callback' => array( $this, 'get_items_permissions_check' ), + ), + ) + ); + + \register_rest_route( + $this->namespace, + '/proxy-event-stream', + array( + array( + 'methods' => \WP_REST_Server::READABLE, + 'callback' => array( $this, 'get_proxy_stream' ), + 'permission_callback' => array( $this, 'get_proxy_permissions_check' ), + 'args' => array( + 'url' => array( + 'description' => 'The remote eventStream URL to proxy.', + 'type' => 'string', + 'format' => 'uri', + 'required' => true, + 'sanitize_callback' => 'sanitize_url', + ), + ), + ), + ) + ); + } + + /** + * Validates the user_id parameter. + * + * @param mixed $user_id The user_id parameter. + * @return bool|\WP_Error True if the user_id is valid, WP_Error otherwise. + */ + public function validate_user_id( $user_id ) { + $user = Actors::get_by_id( $user_id ); + if ( \is_wp_error( $user ) ) { + return $user; + } + + return true; + } + + /** + * Check permissions for the SSE stream endpoint. + * + * Requires OAuth authentication with the `push` scope. + * + * @param \WP_REST_Request $request Full details about the request. + * @return bool|\WP_Error True if authorized, WP_Error otherwise. + */ + public function get_items_permissions_check( $request ) { + $oauth_result = OAuth_Server::check_oauth_permission( $request, Scope::PUSH ); + if ( true !== $oauth_result ) { + return $oauth_result; + } + + $user_id = $request->get_param( 'user_id' ); + + if ( null === $user_id ) { + return true; + } + + return $this->verify_owner( $request ); + } + + /** + * Check permissions for the proxy event stream endpoint. + * + * @param \WP_REST_Request $request Full details about the request. + * @return bool|\WP_Error True if authorized, WP_Error otherwise. + */ + public function get_proxy_permissions_check( $request ) { + return OAuth_Server::check_oauth_permission( $request, Scope::PUSH ); + } + + /** + * Stream SSE events for a collection. + * + * This method sends raw SSE output and calls exit — it does not + * return a WP_REST_Response. + * + * @param \WP_REST_Request $request Full details about the request. + * @return void + */ + public function get_items( $request ) { + $user_id = $request->get_param( 'user_id' ); + $collection = $request->get_param( 'collection' ); + + // Allow PHP to detect client disconnects instead of auto-terminating. + ignore_user_abort( true ); + + $this->send_sse_headers(); + + // Get the latest item ID as our starting point. + $since_id = $this->get_latest_item_id( $user_id, $collection ); + $start = time(); + + // Send initial connected event. + $this->send_sse_comment( 'connected' ); + + while ( ( time() - $start ) < self::MAX_DURATION ) { + if ( \connection_aborted() ) { + break; + } + + // Check for signal transient before querying the DB. + $signal_key = sprintf( 'activitypub_sse_signal_%s_%s', $user_id, $collection ); + $signal = \get_transient( $signal_key ); + + if ( $signal ) { + \delete_transient( $signal_key ); + + $new_items = $this->get_new_items( $user_id, $collection, $since_id ); + + foreach ( $new_items as $item ) { + $this->send_sse_event( $item, $collection ); + + if ( $item->ID > $since_id ) { + $since_id = $item->ID; + } + } + } + + // Send keepalive comment. + $this->send_sse_comment( 'keepalive ' . \gmdate( 'c' ) ); + + // Flush and sleep. + $this->flush_output(); + + // phpcs:ignore WordPress.WP.AlternativeFunctions.sleep_sleep -- SSE long-polling requires blocking sleep. + sleep( self::POLL_INTERVAL ); + } + + $this->send_sse_comment( 'timeout' ); + $this->flush_output(); + + exit; + } + + /** + * Proxy a remote eventStream (not yet implemented). + * + * WordPress's HTTP API does not support streaming responses, + * so this endpoint returns 501 Not Implemented for now. + * + * @param \WP_REST_Request $request Full details about the request. + * @return \WP_Error Always returns 501. + */ + public function get_proxy_stream( $request ) { // phpcs:ignore VariableAnalysis.CodeAnalysis.VariableAnalysis.UnusedVariable + return new \WP_Error( + 'activitypub_not_implemented', + \__( 'Proxy event streaming is not yet supported.', 'activitypub' ), + array( 'status' => 501 ) + ); + } + + /** + * Send SSE-specific HTTP headers. + */ + private function send_sse_headers() { + // Clear any output buffers. + while ( ob_get_level() > 0 ) { + ob_end_clean(); + } + + \status_header( 200 ); + \header( 'Content-Type: text/event-stream' ); + \header( 'Cache-Control: no-cache' ); + \header( 'X-Accel-Buffering: no' ); + + // CORS headers for browser-based clients. + $origin = isset( $_SERVER['HTTP_ORIGIN'] ) ? \esc_url_raw( \wp_unslash( $_SERVER['HTTP_ORIGIN'] ) ) : ''; + + if ( $origin ) { + \header( 'Access-Control-Allow-Origin: ' . $origin ); + \header( 'Access-Control-Allow-Credentials: true' ); + \header( 'Vary: Origin' ); + } else { + \header( 'Access-Control-Allow-Origin: *' ); + } + } + + /** + * Send an SSE event. + * + * @param \WP_Post $item The outbox or inbox post item. + * @param string $collection The collection type ('outbox' or 'inbox'). + */ + private function send_sse_event( $item, $collection ) { + $event_type = $this->get_event_type( $item, $collection ); + $data = $this->get_event_data( $item, $collection ); + + if ( ! $data ) { + return; + } + + // phpcs:ignore WordPress.Security.EscapeOutput.OutputNotEscaped -- SSE protocol requires raw output. + echo 'event: ' . $event_type . "\n"; + // phpcs:ignore WordPress.Security.EscapeOutput.OutputNotEscaped -- SSE protocol requires raw JSON output. + echo 'data: ' . \wp_json_encode( $data ) . "\n"; + echo 'id: ' . (int) $item->ID . "\n\n"; + } + + /** + * Send an SSE comment (keepalive or informational). + * + * @param string $comment The comment text. + */ + private function send_sse_comment( $comment ) { + // phpcs:ignore WordPress.Security.EscapeOutput.OutputNotEscaped -- SSE protocol requires raw output. + echo ': ' . $comment . "\n\n"; + } + + /** + * Flush output buffers. + */ + private function flush_output() { + if ( ob_get_level() > 0 ) { + ob_flush(); + } + flush(); + } + + /** + * Get the SSE event type for an item. + * + * @param \WP_Post $item The outbox or inbox post item. + * @param string $collection The collection type. + * @return string The SSE event type (Add, Update, or Remove). + */ + private function get_event_type( $item, $collection ) { + if ( 'inbox' === $collection ) { + // Inbox items are always additions to the collection. + return 'Add'; + } + + $activity_type = \get_post_meta( $item->ID, '_activitypub_activity_type', true ); + + if ( isset( self::EVENT_TYPE_MAP[ $activity_type ] ) ) { + return self::EVENT_TYPE_MAP[ $activity_type ]; + } + + return 'Add'; + } + + /** + * Get the event data (activity JSON) for an item. + * + * @param \WP_Post $item The outbox or inbox post item. + * @param string $collection The collection type. + * @return array|null The activity data or null on failure. + */ + private function get_event_data( $item, $collection ) { + if ( 'outbox' === $collection ) { + $activity = Outbox::get_activity( $item->ID ); + + if ( \is_wp_error( $activity ) ) { + return null; + } + + return $activity->to_array( false ); + } + + // Inbox items store activity JSON directly in post_content. + $data = \json_decode( $item->post_content, true ); + + return $data ? $data : null; + } + + /** + * Get the latest item ID for a collection. + * + * @param int $user_id The user ID. + * @param string $collection The collection type. + * @return int The latest post ID or 0. + */ + private function get_latest_item_id( $user_id, $collection ) { + $post_type = 'outbox' === $collection ? Outbox::POST_TYPE : Inbox::POST_TYPE; + + $args = array( + 'post_type' => $post_type, + 'post_status' => 'any', + 'posts_per_page' => 1, + 'orderby' => 'ID', + 'order' => 'DESC', + 'fields' => 'ids', + 'no_found_rows' => true, + ); + + if ( 'outbox' === $collection ) { + // phpcs:ignore WordPress.DB.SlowDBQuery.slow_db_query_meta_query + $args['meta_query'] = array( + array( + 'key' => '_activitypub_activity_actor', + 'value' => Actors::get_type_by_id( $user_id ), + ), + ); + } else { + // phpcs:ignore WordPress.DB.SlowDBQuery.slow_db_query_meta_query + $args['meta_query'] = array( + array( + 'key' => '_activitypub_user_id', + 'value' => $user_id, + ), + ); + } + + $query = new \WP_Query( $args ); + + return ! empty( $query->posts ) ? (int) $query->posts[0] : 0; + } + + /** + * Get new items since a given ID. + * + * Uses a `posts_where` filter to add `ID > $since_id` to the query, + * since WP_Query does not natively support filtering by minimum post ID. + * + * @param int $user_id The user ID. + * @param string $collection The collection type. + * @param int $since_id Only return items with ID greater than this. + * @return \WP_Post[] Array of new post items. + */ + private function get_new_items( $user_id, $collection, $since_id ) { + $post_type = 'outbox' === $collection ? Outbox::POST_TYPE : Inbox::POST_TYPE; + + $args = array( + 'post_type' => $post_type, + 'post_status' => 'any', + 'posts_per_page' => 20, + 'orderby' => 'ID', + 'order' => 'ASC', + 'no_found_rows' => true, + ); + + if ( 'outbox' === $collection ) { + // phpcs:ignore WordPress.DB.SlowDBQuery.slow_db_query_meta_query + $args['meta_query'] = array( + array( + 'key' => '_activitypub_activity_actor', + 'value' => Actors::get_type_by_id( $user_id ), + ), + ); + } else { + // phpcs:ignore WordPress.DB.SlowDBQuery.slow_db_query_meta_query + $args['meta_query'] = array( + array( + 'key' => '_activitypub_user_id', + 'value' => $user_id, + ), + ); + } + + // Add a posts_where filter to restrict to items newer than $since_id. + if ( $since_id > 0 ) { + $where_filter = function ( $where ) use ( $since_id ) { + global $wpdb; + $where .= $wpdb->prepare( " AND {$wpdb->posts}.ID > %d", $since_id ); + return $where; + }; + \add_filter( 'posts_where', $where_filter ); + } + + $query = new \WP_Query( $args ); + + if ( $since_id > 0 ) { + \remove_filter( 'posts_where', $where_filter ); + } + + return $query->posts; + } + + /** + * Get the stream URL for a collection. + * + * @param int $user_id The user ID. + * @param string $collection The collection name ('outbox' or 'inbox'). + * @return string The stream URL. + */ + public static function get_stream_url( $user_id, $collection ) { + return get_rest_url_by_path( sprintf( 'actors/%d/%s/stream', $user_id, $collection ) ); + } + + /** + * Get the proxy event stream URL. + * + * @return string The proxy event stream URL. + */ + public static function get_proxy_url() { + return get_rest_url_by_path( 'proxy-event-stream' ); + } +} diff --git a/includes/rest/class-outbox-controller.php b/includes/rest/class-outbox-controller.php index 531c6f3496..5d2b80dc3d 100644 --- a/includes/rest/class-outbox-controller.php +++ b/includes/rest/class-outbox-controller.php @@ -187,6 +187,7 @@ public function get_items( $request ) { 'actor' => $user->get_id(), 'type' => 'OrderedCollection', 'totalItems' => (int) $outbox_query->found_posts, + 'eventStream' => Event_Stream_Controller::get_stream_url( $user_id, 'outbox' ), 'orderedItems' => array(), );