diff --git a/Cargo.lock b/Cargo.lock index 4618a68..6179d57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -104,13 +104,40 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "itoa", + "matchit 0.7.3", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower 0.5.3", + "tower-layer", + "tower-service", +] + [[package]] name = "axum" version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b52af3cb4058c895d37317bb27508dccc8e5f2d39454016b297bf4a400597b8" dependencies = [ - "axum-core", + "axum-core 0.5.6", "axum-macros", "bytes", "futures-util", @@ -118,7 +145,7 @@ dependencies = [ "http-body", "http-body-util", "itoa", - "matchit", + "matchit 0.8.4", "memchr", "mime", "percent-encoding", @@ -127,7 +154,27 @@ dependencies = [ "serde_json", "serde_path_to_error", "sync_wrapper", - "tower", + "tower 0.5.3", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", "tower-layer", "tower-service", ] @@ -165,7 +212,7 @@ dependencies = [ name = "axum-router" version = "0.1.0" dependencies = [ - "axum", + "axum 0.8.8", "serde", "spin-sdk", "tower-service", @@ -916,6 +963,20 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "grpc-streaming" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-stream", + "futures", + "prost", + "spin-sdk", + "tokio-stream", + "tonic", + "tonic-build", +] + [[package]] name = "h2" version = "0.4.13" @@ -1353,6 +1414,12 @@ dependencies = [ "libc", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matchit" version = "0.8.4" @@ -2029,7 +2096,7 @@ name = "send-request" version = "0.1.0" dependencies = [ "anyhow", - "axum", + "axum 0.8.8", "http", "spin-sdk", ] @@ -2510,6 +2577,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-trait", + "axum 0.7.9", "base64", "bytes", "http", @@ -2519,6 +2587,7 @@ dependencies = [ "pin-project", "prost", "tokio-stream", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -2538,6 +2607,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tower-layer", + "tower-service", +] + [[package]] name = "tower" version = "0.5.3" diff --git a/Cargo.toml b/Cargo.toml index 8059851..64c1aad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ "examples/grpc", + "examples/grpc-streaming", "examples/hello-world", "examples/http-axum-router", "examples/http-concurrent-outbound-calls", diff --git a/examples/grpc-streaming/Cargo.toml b/examples/grpc-streaming/Cargo.toml new file mode 100644 index 0000000..ff4e019 --- /dev/null +++ b/examples/grpc-streaming/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "grpc-streaming" +version = "0.1.0" +edition = "2024" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +anyhow = "1" +async-stream = "0.3" +futures = "0.3" +prost = "0.13" +spin-sdk = { path = "../../crates/spin-sdk", features = ["grpc"] } +tokio-stream = "0.1" +tonic = { version = "0.12", default-features = false, features = ["codegen", "prost", "router"] } + +[build-dependencies] +tonic-build = { version = "0.12", default-features = false, features = ["prost"] } diff --git a/examples/grpc-streaming/README.md b/examples/grpc-streaming/README.md new file mode 100644 index 0000000..219ea15 --- /dev/null +++ b/examples/grpc-streaming/README.md @@ -0,0 +1,48 @@ +# gRPC Streaming + +A Spin HTTP component that serves gRPC endpoints using [tonic](https://github.com/hyperium/tonic), +demonstrating all combinations of unary and streaming requests and responses. + +## Prerequisites + +- [protoc](https://grpc.io/docs/protoc-installation/) (Protocol Buffers compiler) +- `wasm32-wasip2` target: `rustup target add wasm32-wasip2` + +## Build and Run + +```sh +spin up --build --sqlite @db.sql +``` + +## Test with grpcurl + +> **Note:** These commands pass `-import-path` and `-proto` because the server +> does not implement gRPC server reflection. + +Unary call: + +```sh +grpcurl -plaintext -import-path proto -proto route_guide.proto -d '{"latitude":18,"longitude":19}' localhost:3000 routeguide.RouteGuide/GetFeature +``` + +Server-streaming call: + +```sh + grpcurl -plaintext -import-path proto -proto route_guide.proto -d '{"lo":{"latitude":12,"longitude":10},"hi":{"latitude":28,"longitude":25}}' localhost:3000 routeguide.RouteGuide/ListFeatures +``` + +Client-streaming call: + +```sh +grpcurl -plaintext -import-path proto -proto route_guide.proto -d '{"latitude":18,"longitude":19}{"latitude":12,"longitude":20}{"latitude":13,"longitude":17}{"latitude":14,"longitude":18}' localhost:3000 routeguide.RouteGuide/RecordRoute +``` + +Client- and server-streaming: + +```sh +grpcurl -plaintext -import-path proto -proto route_guide.proto -d '{"location":{"latitude":18,"longitude":19},"message":"hello from fang rock!"}{"location":{"latitude":12,"longitude":20},"message":"i summited mt hobbes!"}' localhost:3000 routeguide.RouteGuide/RouteChat +grpcurl -plaintext -import-path proto -proto route_guide.proto -d '{"location":{"latitude":18,"longitude":19},"message":"farewell from fang rock!"}' localhost:3000 routeguide.RouteGuide/RouteChat +grpcurl -plaintext -import-path proto -proto route_guide.proto -d '{"location":{"latitude":12,"longitude":20},"message":"i was impaled on sharp claws at mt hobbes!"}{"location":{"latitude":18,"longitude":19},"message":"i got nibbled by rutans at fang rock!"}' localhost:3000 routeguide.RouteGuide/RouteChat +``` + +(Multiple commands because the routeguide scenario needs you to build up some history at a location to see interesting responses.) diff --git a/examples/grpc-streaming/build.rs b/examples/grpc-streaming/build.rs new file mode 100644 index 0000000..3aaa3c1 --- /dev/null +++ b/examples/grpc-streaming/build.rs @@ -0,0 +1,7 @@ +fn main() { + tonic_build::configure() + .type_attribute("routeguide.Point", "#[derive(Hash)]") + .build_transport(false) + .compile_protos(&["proto/route_guide.proto"], &[""]) + .unwrap(); +} diff --git a/examples/grpc-streaming/db.sql b/examples/grpc-streaming/db.sql new file mode 100644 index 0000000..5717b8e --- /dev/null +++ b/examples/grpc-streaming/db.sql @@ -0,0 +1,21 @@ +CREATE TABLE IF NOT EXISTS features ( + lat NUMBER, + long NUMBER, + name TEXT +); + +DELETE FROM features; +INSERT INTO features(lat, long, name) VALUES (12, 20, 'Mount Hobbes'); +INSERT INTO features(lat, long, name) VALUES (30, 8, 'Upper Rosie'); +INSERT INTO features(lat, long, name) VALUES (14, 18, 'Slats'' Food Crater'); +INSERT INTO features(lat, long, name) VALUES (25, 30, 'Forest of Smoke'); +INSERT INTO features(lat, long, name) VALUES (22, 7, 'The Great Splodge'); +INSERT INTO features(lat, long, name) VALUES (35, 21, 'Kiki Point'); +INSERT INTO features(lat, long, name) VALUES (18, 19, 'Fang Rock'); + +CREATE TABLE IF NOT EXISTS route_notes( + seq_no INTEGER PRIMARY KEY AUTOINCREMENT, + lat NUMBER, + long NUMBER, + msg_text TEXT +); diff --git a/examples/grpc-streaming/proto/route_guide.proto b/examples/grpc-streaming/proto/route_guide.proto new file mode 100644 index 0000000..fe21e43 --- /dev/null +++ b/examples/grpc-streaming/proto/route_guide.proto @@ -0,0 +1,110 @@ +// Copyright 2015 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.grpc.examples.routeguide"; +option java_outer_classname = "RouteGuideProto"; + +package routeguide; + +// Interface exported by the server. +service RouteGuide { + // A simple RPC. + // + // Obtains the feature at a given position. + // + // A feature with an empty name is returned if there's no feature at the given + // position. + rpc GetFeature(Point) returns (Feature) {} + + // A server-to-client streaming RPC. + // + // Obtains the Features available within the given Rectangle. Results are + // streamed rather than returned at once (e.g. in a response message with a + // repeated field), as the rectangle may cover a large area and contain a + // huge number of features. + rpc ListFeatures(Rectangle) returns (stream Feature) {} + + // A client-to-server streaming RPC. + // + // Accepts a stream of Points on a route being traversed, returning a + // RouteSummary when traversal is completed. + rpc RecordRoute(stream Point) returns (RouteSummary) {} + + // A Bidirectional streaming RPC. + // + // Accepts a stream of RouteNotes sent while a route is being traversed, + // while receiving other RouteNotes (e.g. from other users). + rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} +} + +// Points are represented as latitude-longitude pairs in the E7 representation +// (degrees multiplied by 10**7 and rounded to the nearest integer). +// Latitudes should be in the range +/- 90 degrees and longitude should be in +// the range +/- 180 degrees (inclusive). +message Point { + int32 latitude = 1; + int32 longitude = 2; +} + +// A latitude-longitude rectangle, represented as two diagonally opposite +// points "lo" and "hi". +message Rectangle { + // One corner of the rectangle. + Point lo = 1; + + // The other corner of the rectangle. + Point hi = 2; +} + +// A feature names something at a given point. +// +// If a feature could not be named, the name is empty. +message Feature { + // The name of the feature. + string name = 1; + + // The point where the feature is detected. + Point location = 2; +} + +// A RouteNote is a message sent while at a given point. +message RouteNote { + // The location from which the message is sent. + Point location = 1; + + // The message to be sent. + string message = 2; +} + +// A RouteSummary is received in response to a RecordRoute rpc. +// +// It contains the number of individual points received, the number of +// detected features, and the total distance covered as the cumulative sum of +// the distance between each point. +message RouteSummary { + // The number of points received. + int32 point_count = 1; + + // The number of known features passed while traversing the route. + int32 feature_count = 2; + + // The distance covered in metres. + int32 distance = 3; + + // The duration of the traversal in seconds. + int32 elapsed_time = 4; +} diff --git a/examples/grpc-streaming/spin.toml b/examples/grpc-streaming/spin.toml new file mode 100644 index 0000000..1932447 --- /dev/null +++ b/examples/grpc-streaming/spin.toml @@ -0,0 +1,20 @@ +#:schema https://schemas.spinframework.dev/spin/manifest-v2/latest.json + +spin_manifest_version = 2 + +[application] +name = "grpc-streaming" +version = "0.1.0" +authors = ["The Spin authors"] +description = "An example gRPC application with bidirectional streaming" + +[[trigger.http]] +route = "/..." +component = "grpc-streaming" + +[component.grpc-streaming] +source = "../../target/wasm32-wasip2/release/grpc_streaming.wasm" +sqlite_databases = ["default"] +[component.grpc-streaming.build] +command = "cargo build --target wasm32-wasip2 --release" +watch = ["src/**/*.rs", "Cargo.toml", "proto/**/*.proto"] diff --git a/examples/grpc-streaming/src/lib.rs b/examples/grpc-streaming/src/lib.rs new file mode 100644 index 0000000..9ba5e02 --- /dev/null +++ b/examples/grpc-streaming/src/lib.rs @@ -0,0 +1,277 @@ +pub mod route_guide { + tonic::include_proto!("routeguide"); +} + +use route_guide::route_guide_server::RouteGuide; +use route_guide::route_guide_server::RouteGuideServer; + +use futures::{SinkExt, Stream}; +use spin_sdk::{ + sqlite::{self, Value}, + wasip3, +}; +use tonic::{Request, Response, Status, Streaming}; + +#[spin_sdk::http_service] +async fn handle(req: spin_sdk::http::Request) -> impl spin_sdk::http::IntoResponse { + spin_sdk::http::grpc::serve(RouteGuideServer::new(Svc), req).await +} + +struct Svc; + +#[tonic::async_trait] +impl RouteGuide for Svc { + async fn get_feature( + &self, + request: Request, + ) -> Result, Status> { + let conn = sqlite::Connection::open_default() + .await + .map_err(as_status)?; + + let location = request.into_inner(); + + let name = feature_name_at(&conn, location) + .await + .map_err(as_status)? + .unwrap_or_default(); + + let feat = route_guide::Feature { + name, + location: Some(location), + }; + + Ok(Response::new(feat)) + } + + type ListFeaturesStream = std::pin::Pin< + Box> + Send + 'static>, + >; + + async fn list_features( + &self, + request: Request, + ) -> Result, Status> { + // Helper function to let us use `?` syntax to manage errors. + async fn list_impl( + tx: &mut futures::channel::mpsc::Sender>, + bounds: route_guide::Rectangle, + ) -> Result<(), sqlite::Error> { + let lat1 = bounds.lo.unwrap_or_default().latitude; + let lat2 = bounds.hi.unwrap_or_default().latitude; + let min_lat = std::cmp::min(lat1, lat2).into(); + let max_lat = std::cmp::max(lat1, lat2).into(); + + let long1 = bounds.lo.unwrap_or_default().longitude; + let long2 = bounds.hi.unwrap_or_default().longitude; + let min_long = std::cmp::min(long1, long2).into(); + let max_long = std::cmp::max(long1, long2).into(); + + let conn = sqlite::Connection::open_default().await?; + + let mut features_qr = conn.execute("SELECT lat, long, name FROM features WHERE lat >= ? AND lat <= ? AND long >= ? AND long <= ?", [Value::Integer(min_lat), Value::Integer(max_lat), Value::Integer(min_long), Value::Integer(max_long)]).await?; + + while let Some(feat_row) = features_qr.next().await { + let latitude = feat_row.get::(0).unwrap_or_default(); + let longitude = feat_row.get::(1).unwrap_or_default(); + let name = feat_row + .get::<&str>(2) + .map(|s| s.to_owned()) + .unwrap_or_default(); + + let feat = route_guide::Feature { + name, + location: Some(route_guide::Point { + latitude, + longitude, + }), + }; + + if tx.send(Ok(feat)).await.is_err() { + break; + } + } + + features_qr.result().await?; + + Ok(()) + } + + let bounds = request.into_inner(); + + let (mut tx, rx) = futures::channel::mpsc::channel(1024); + + wasip3::spawn(async move { + if let Err(e) = list_impl(&mut tx, bounds).await { + _ = tx.send(Err(as_status(e))).await; + } + }); + + Ok(Response::new(Box::pin(rx))) + } + + async fn record_route( + &self, + request: Request>, + ) -> Result, Status> { + let mut req = request; + let r = req.get_mut(); + + let mut distance = 0; + let mut count = 0; + let mut feature_count = 0; + let mut last_pt = None; + let start_time = std::time::SystemTime::now(); + + let conn = sqlite::Connection::open_default() + .await + .map_err(as_status)?; + + loop { + let Some(pt) = r.message().await? else { + break; + }; + + count += 1; + + if let Some(last) = last_pt { + distance += dist(last, pt); + } + if feature_name_at(&conn, pt).await.is_ok_and(|f| f.is_some()) { + feature_count += 1; + } + + last_pt = Some(pt); + } + + let end_time = std::time::SystemTime::now(); + let elapsed_time = end_time + .duration_since(start_time) + .unwrap_or_default() + .as_secs() + .try_into() + .unwrap_or_default(); + + Ok(Response::new(route_guide::RouteSummary { + point_count: count, + feature_count, + distance, + elapsed_time, + })) + } + + type RouteChatStream = std::pin::Pin< + Box> + Send + 'static>, + >; + + async fn route_chat( + &self, + request: Request>, + ) -> Result, Status> { + // This operation is explained as: accept a stream of messages from the client; for + // each message, respond with the prior messages at the same location. + // (This means that the server never _initiates_ send messages. If it did, Spin could + // handle that, but we'd need separate receive and send tasks running concurrently. + // The routeguide sample doesn't have a scenario that shows this.) + + // Helper function to let us use `?` syntax to manage errors. + async fn insert_and_reply( + conn: &sqlite::Connection, + tx: &mut futures::channel::mpsc::Sender>, + message: route_guide::RouteNote, + ) -> Result<(), sqlite::Error> { + if let Some(location) = message.location { + let lat = location.latitude.into(); + let long = location.longitude.into(); + let message = message.message; + + let ins_qr = conn + .execute( + "INSERT INTO route_notes(lat, long, msg_text) VALUES (?, ?, ?)", + [ + Value::Integer(lat), + Value::Integer(long), + Value::Text(message), + ], + ) + .await?; + ins_qr.collect().await?; + let to_skip = conn.last_insert_rowid().await; + + let mut notes_qr = conn.execute("SELECT lat, long, msg_text FROM route_notes WHERE lat = ? AND long = ? AND rowid <> ? ORDER BY seq_no", [Value::Integer(lat), Value::Integer(long), Value::Integer(to_skip)]).await?; + + while let Some(row) = notes_qr.next().await { + let prev_msg = route_guide::RouteNote { + location: Some(route_guide::Point { + latitude: row.get::(0).unwrap_or_default(), + longitude: row.get::(1).unwrap_or_default(), + }), + message: row.get::<&str>(2).map(|s| s.to_owned()).unwrap_or_default(), + }; + if tx.send(Ok(prev_msg)).await.is_err() { + break; + } + } + + notes_qr.result().await?; + }; + + Ok(()) + } + + let mut req_stm = request.into_inner(); + + let conn = sqlite::Connection::open_default() + .await + .map_err(as_status)?; + + let (mut tx, rx) = futures::channel::mpsc::channel(1024); + + wasip3::spawn(async move { + while let Ok(Some(message)) = req_stm.message().await { + if let Err(e) = insert_and_reply(&conn, &mut tx, message).await { + _ = tx.send(Err(as_status(e))).await; + break; + } + } + }); + + Ok(tonic::Response::new(Box::pin(rx))) + } +} + +fn as_status(e: sqlite::Error) -> Status { + Status::internal(e.to_string()) +} + +async fn feature_name_at( + conn: &sqlite::Connection, + location: route_guide::Point, +) -> Result, sqlite::Error> { + let latitude = location.latitude.into(); + let longitude = location.longitude.into(); + + let mut features_qr = conn + .execute( + "SELECT name FROM features WHERE lat = ? AND long = ?", + [Value::Integer(latitude), Value::Integer(longitude)], + ) + .await?; + + let name = match features_qr.next().await { + None => { + features_qr.result().await?; // check if the None was due to an error + None + } + Some(row) => row.get::<&str>(0).map(|s| s.to_owned()), + }; + + Ok(name) +} + +fn dist(pt1: route_guide::Point, pt2: route_guide::Point) -> i32 { + let latd = pt1.latitude - pt2.latitude; + let longd = pt1.longitude - pt2.longitude; + let d = f64::sqrt((latd * latd + longd * longd) as f64); + d as i32 +}