Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
760 changes: 374 additions & 386 deletions Cargo.lock

Large diffs are not rendered by default.

63 changes: 36 additions & 27 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@ members = [
[workspace.dependencies]
anyhow = "1.0.82"
async-graphql = { version = "7.0.9" }
bytes = "1"
convert_case = "0.6.0"
derive_more = { version = "1", features = ["from", "debug"] }
futures-util = { version = "0.3.30" }
headers = "0.3.9" # previous version until hyper is updated to 1+
http = "0.2.12" # previous version until hyper is updated to 1+
headers = "0.4"
http = "1.0"
http-body = "1.0"
http-body-util = "0.1"
hyper = { version = "1.0", features = ["server", "http1", "http2"] }
hyper-util = { version = "0.1", features = ["server", "tokio", "http1", "http2"] }
indexmap = "2.2.6"
insta = { version = "1.38.0", features = ["json"] }
lazy_static = "1.4.0"
reqwest = { version = "0.11", features = [
reqwest = { version = "0.12", features = [
"json",
"rustls-tls",
"http2",
], default-features = false }
serde = { version = "1.0.200", features = ["derive"] }
serde_json = { version = "1.0.116", features = ["preserve_order"] }
Expand Down Expand Up @@ -65,6 +71,7 @@ cli = [
"dep:moka",
"dep:hyper-rustls",
"dep:rustls",
"dep:tokio-rustls",
"dep:inquire",
"dep:which",
"dep:update-informer",
Expand All @@ -77,6 +84,7 @@ cli = [
"dep:tailcall-version",
"dep:genai",
"dep:ctrlc",
"dep:reqwest-middleware",
]

# Feature flag to enable all default features.
Expand All @@ -103,6 +111,7 @@ async-std = { version = "1.12.0", features = [
] }
async-trait = "0.1.80"
base64 = "0.22.1"
bytes = "1"
cache_control = "0.2.0"
chrono = "0.4.38"
clap = { version = "4.5.4", features = ["derive"] }
Expand All @@ -126,11 +135,14 @@ getrandom = { version = "0.2.14", features = ["js"] }
headers = { workspace = true }
htpasswd-verify = { version = "0.3.0", git = "https://github.com/twistedfall/htpasswd-verify", rev = "ff14703083cbd639f7d05622b398926f3e718d61" } # fork version that is wasm compatible
http = { workspace = true }
http-cache-reqwest = { version = "0.13.0", features = [
http-body = { workspace = true }
http-body-util = { workspace = true }
http-cache-reqwest = { version = "0.14.0", features = [
"manager-moka",
], default-features = false, optional = true }
hyper = { version = "0.14.28", features = ["server"], default-features = false }
hyper-rustls = { version = "0.25.0", optional = true }
hyper = { workspace = true }
hyper-util = { workspace = true }
hyper-rustls = { version = "0.27.0", optional = true }
indenter = "0.3.3"
indexmap = { workspace = true }
inquire = { version = "0.7.5", optional = true }
Expand All @@ -149,25 +161,26 @@ nom = "7.1.3"
num = "0.4.3"
num_cpus = "1.16.0"
once_cell = "1.19.0"
opentelemetry = { version = "0.23.0", features = ["trace", "logs", "metrics"] }
opentelemetry-appender-tracing = { version = "0.4.0" }
opentelemetry-http = "0.12.0"
opentelemetry-otlp = { version = "0.16.0", features = [
opentelemetry = { version = "0.28", features = ["trace", "logs", "metrics"] }
opentelemetry-appender-tracing = { version = "0.28" }
opentelemetry-http = "0.28"
opentelemetry-otlp = { version = "0.28", features = [
"trace",
"logs",
"metrics",
# required to make grpc requests
"grpc-tonic",
"tls-roots",
], optional = true }
opentelemetry-prometheus = "0.16.0"
opentelemetry-semantic-conventions = "0.15.0"
opentelemetry-stdout = { version = "0.4.0", features = [
opentelemetry-prometheus = "0.28"
opentelemetry-semantic-conventions = "0.28"
opentelemetry-stdout = { version = "0.28", features = [
"trace",
"logs",
"metrics",
] }
opentelemetry-system-metrics = { version = "0.2.0", optional = true }
opentelemetry_sdk = { version = "0.23.0", features = [
opentelemetry-system-metrics = { version = "0.3", optional = true }
opentelemetry_sdk = { version = "0.28", features = [
"trace",
"logs",
"metrics",
Expand All @@ -184,11 +197,12 @@ protox-parse = "0.7.0"
rand = "0.8.5"
regex = "1.10.4"
reqwest = { workspace = true }
reqwest-middleware = "0.2.5"
reqwest-middleware = { version = "0.3", optional = true }
resource = "0.5.0"
rquickjs = { "version" = "0.7.0", optional = true, features = ["macro"] }
rustls = { version = "0.23.5", optional = true, features = [
"std",
"ring",
], default-features = false }
# dependencies safe for wasm:

Expand All @@ -214,11 +228,12 @@ tailcall-valid = { workspace = true }
tailcall-version = { path = "./tailcall-version", optional = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { version = "0.11.0", default-features = false }
tonic-types = "0.12.1"
tokio-rustls = { version = "0.26", optional = true }
tonic = { version = "0.12", default-features = false }
tonic-types = "0.12"
# TODO: disable some levels with features?
tracing = { workspace = true }
tracing-opentelemetry = "0.24.0"
tracing-opentelemetry = "0.29"
tracing-subscriber = { version = "0.3.18", features = [
"default",
"fmt",
Expand All @@ -243,17 +258,11 @@ rquickjs = { "version" = "0.7.0", optional = true, features = [

[dev-dependencies]
bincode = "1.3.3"
cacache = { version = "13.0.0", default-features = false, features = [
"tokio-runtime",
"mmap",
] }
cacache = { version = "13.0.0", features = ["mmap"] }
criterion = "0.5.1"
datatest-stable = "0.2.9"
flate2 = "1.0.30"
http-cache-semantics = { version = "1.0.1", default-features = false, features = [
"with_serde",
"reqwest",
] }
http-cache-semantics = { version = "2.1.0", features = ["serde"] }
httpmock = "0.7.0"
insta = { workspace = true }
maplit = "1.0.2"
Expand Down
10 changes: 6 additions & 4 deletions benches/handle_request_bench.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::sync::Arc;

use bytes::Bytes;
use criterion::Criterion;
use http::Request;
use http_body_util::Full;
use tailcall::cli::server::server_config::ServerConfig;
use tailcall::core::async_graphql_hyper::GraphQLRequest;
use tailcall::core::blueprint::Blueprint;
Expand Down Expand Up @@ -35,10 +37,10 @@ pub fn benchmark_handle_request(c: &mut Criterion) {
let req = Request::builder()
.method("POST")
.uri("http://localhost:8000/graphql")
.body(hyper::Body::from(QUERY))
.body(Full::new(Bytes::from(QUERY)))
.unwrap();

let _ = handle_request::<GraphQLRequest>(req, server_config.app_ctx.clone())
let _ = handle_request::<GraphQLRequest, _>(req, server_config.app_ctx.clone())
.await
.unwrap();
});
Expand All @@ -58,10 +60,10 @@ pub fn benchmark_handle_request(c: &mut Criterion) {
let req = Request::builder()
.method("POST")
.uri("http://localhost:8000/graphql")
.body(hyper::Body::from(QUERY))
.body(Full::new(Bytes::from(QUERY)))
.unwrap();

let _ = handle_request::<GraphQLRequest>(req, server_config.app_ctx.clone())
let _ = handle_request::<GraphQLRequest, _>(req, server_config.app_ctx.clone())
.await
.unwrap();
});
Expand Down
25 changes: 13 additions & 12 deletions src/cli/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,30 @@ use crate::core::runtime::TargetRuntime;
fn cache_metrics(runtime: &TargetRuntime) -> Result<()> {
let meter = opentelemetry::global::meter("cache");
let cache = runtime.cache.clone();
let counter = meter
let _counter = meter
.f64_observable_gauge("cache.hit_rate")
.with_description("Cache hit rate ratio")
.init();

meter.register_callback(&[counter.as_any()], move |observer| {
if let Some(hit_rate) = cache.hit_rate() {
observer.observe_f64(&counter, hit_rate, &[]);
}
})?;
.with_callback(move |observer| {
if let Some(hit_rate) = cache.hit_rate() {
observer.observe(hit_rate, &[]);
}
})
.build();

Ok(())
}

fn process_resources_metrics() -> Result<()> {
async fn process_resources_metrics() -> Result<()> {
let meter = opentelemetry::global::meter("process-resources");

opentelemetry_system_metrics::init_process_observer(meter).map_err(|err| anyhow!(err))
opentelemetry_system_metrics::init_process_observer(meter)
.await
.map_err(|err| anyhow!(err))
}

pub fn init_metrics(runtime: &TargetRuntime) -> Result<()> {
pub async fn init_metrics(runtime: &TargetRuntime) -> Result<()> {
cache_metrics(runtime)?;
process_resources_metrics()?;
process_resources_metrics().await?;

Ok(())
}
5 changes: 1 addition & 4 deletions src/cli/runtime/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ static HTTP_CLIENT_REQUEST_COUNT: Lazy<Counter<u64>> = Lazy::new(|| {
meter
.u64_counter("http.client.request.count")
.with_description("Number of outgoing requests")
.init()
.build()
});

#[derive(Default)]
Expand Down Expand Up @@ -92,9 +92,6 @@ impl NativeHttp {
.tcp_keepalive(Some(Duration::from_secs(upstream.tcp_keep_alive)))
.timeout(Duration::from_secs(upstream.timeout))
.connect_timeout(Duration::from_secs(upstream.connect_timeout))
.http2_keep_alive_interval(Some(Duration::from_secs(upstream.keep_alive_interval)))
.http2_keep_alive_timeout(Duration::from_secs(upstream.keep_alive_timeout))
.http2_keep_alive_while_idle(upstream.keep_alive_while_idle)
.pool_idle_timeout(Some(Duration::from_secs(upstream.pool_idle_timeout)))
.pool_max_idle_per_host(upstream.pool_max_idle_per_host)
.user_agent(upstream.user_agent.clone())
Expand Down
66 changes: 34 additions & 32 deletions src/cli/server/http_1.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,22 @@
use std::sync::Arc;

use hyper::service::{make_service_fn, service_fn};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::sync::oneshot;

use super::server_config::ServerConfig;
use crate::core::async_graphql_hyper::{GraphQLBatchRequest, GraphQLRequest};
use crate::core::http::handle_request;
use crate::core::Errata;

pub async fn start_http_1(
sc: Arc<ServerConfig>,
server_up_sender: Option<oneshot::Sender<()>>,
) -> anyhow::Result<()> {
let addr = sc.addr();
let make_svc_single_req = make_service_fn(|_conn| {
let state = Arc::clone(&sc);
async move {
Ok::<_, anyhow::Error>(service_fn(move |req| {
handle_request::<GraphQLRequest>(req, state.app_ctx.clone())
}))
}
});

let make_svc_batch_req = make_service_fn(|_conn| {
let state = Arc::clone(&sc);
async move {
Ok::<_, anyhow::Error>(service_fn(move |req| {
handle_request::<GraphQLBatchRequest>(req, state.app_ctx.clone())
}))
}
});
let builder = hyper::Server::try_bind(&addr)
.map_err(Errata::from)?
.http1_pipeline_flush(sc.app_ctx.blueprint.server.pipeline_flush);
let listener = TcpListener::bind(&addr).await?;

super::log_launch(sc.as_ref());

if let Some(sender) = server_up_sender {
Expand All @@ -41,14 +25,32 @@ pub async fn start_http_1(
.or(Err(anyhow::anyhow!("Failed to send message")))?;
}

let server: std::prelude::v1::Result<(), hyper::Error> =
if sc.blueprint.server.enable_batch_requests {
builder.serve(make_svc_batch_req).await
} else {
builder.serve(make_svc_single_req).await
};

let result = server.map_err(Errata::from);

Ok(result?)
let enable_batch = sc.app_ctx.blueprint.server.enable_batch_requests;
let pipeline_flush = sc.app_ctx.blueprint.server.pipeline_flush;

loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
let app_ctx = sc.app_ctx.clone();

tokio::task::spawn(async move {
let service = service_fn(move |req| {
let app_ctx = app_ctx.clone();
async move {
if enable_batch {
handle_request::<GraphQLBatchRequest, _>(req, app_ctx).await
} else {
handle_request::<GraphQLRequest, _>(req, app_ctx).await
}
}
});

let mut conn = http1::Builder::new();
conn.pipeline_flush(pipeline_flush);

if let Err(err) = conn.serve_connection(io, service).await {
tracing::debug!("Error serving connection: {:?}", err);
}
});
}
}
Loading
Loading