diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 33e2cf38be8..f0954e03697 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7108,6 +7108,7 @@ dependencies = [ "mockall", "once_cell", "percent-encoding", + "pin-project", "pprof", "prost 0.14.1", "prost-types 0.14.1", diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index 4fa52e6b9ea..26bed948189 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -687,7 +687,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> { split_id: None, }; let search_request = - search_request_from_api_request(vec![args.index_id], search_request_query_string)?; + search_request_from_api_request(vec![args.index_id], search_request_query_string, None)?; debug!(search_request=?search_request, "search-request"); let search_response: SearchResponse = single_node_search(search_request, metastore, storage_resolver).await?; diff --git a/quickwit/quickwit-proto/protos/quickwit/search.proto b/quickwit/quickwit-proto/protos/quickwit/search.proto index f50f79c0d73..205719d1b89 100644 --- a/quickwit/quickwit-proto/protos/quickwit/search.proto +++ b/quickwit/quickwit-proto/protos/quickwit/search.proto @@ -249,6 +249,9 @@ message SearchRequest { bool ignore_missing_indexes = 18; optional string split_id = 19; + + // The user agent of the client that initiated the search request. + optional string user_agent = 20; } enum CountHits { diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs index baad891ea58..c2f1aa6ae5b 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs @@ -187,6 +187,9 @@ pub struct SearchRequest { pub ignore_missing_indexes: bool, #[prost(string, optional, tag = "19")] pub split_id: ::core::option::Option<::prost::alloc::string::String>, + /// The user agent of the client that initiated the search request. + #[prost(string, optional, tag = "20")] + pub user_agent: ::core::option::Option<::prost::alloc::string::String>, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index 1fb2b5c9812..62f45e4abc6 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -25,7 +25,7 @@ use quickwit_proto::ingest::Shard; use quickwit_serve::{ ListSplitsQueryParams, ListSplitsResponse, RestIngestResponse, SearchRequestQueryString, }; -use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue}; +use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue, USER_AGENT}; use reqwest::tls::Certificate; use reqwest::{ClientBuilder as ReqwestClientBuilder, Method, StatusCode, Url}; use reqwest_middleware::{ClientBuilder as ReqwestMiddlewareClientBuilder, ClientWithMiddleware}; @@ -112,6 +112,7 @@ impl Transport { } let mut request_headers = HeaderMap::new(); request_headers.insert(CONTENT_TYPE, HeaderValue::from_static(DEFAULT_CONTENT_TYPE)); + request_headers.insert(USER_AGENT, HeaderValue::from_static("qw-rest-client")); if let Some(header_map_val) = header_map { request_headers.extend(header_map_val.into_iter()); } diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index db4083a7eed..e236fb53d06 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -105,9 +105,9 @@ impl SplitSearchOutcomeCounters { } pub struct SearchMetrics { - pub root_search_requests_total: IntCounterVec<1>, - pub root_search_request_duration_seconds: HistogramVec<1>, - pub root_search_targeted_splits: HistogramVec<1>, + pub root_search_requests_total: IntCounterVec<2>, + pub root_search_request_duration_seconds: HistogramVec<2>, + pub root_search_targeted_splits: HistogramVec<2>, pub leaf_search_requests_total: IntCounterVec<2>, pub leaf_search_request_duration_seconds: HistogramVec<2>, pub leaf_search_targeted_splits: HistogramVec<2>, @@ -170,14 +170,14 @@ impl Default for SearchMetrics { "Total number of root search gRPC requests processed.", "search", &[("kind", "server")], - ["status"], + ["user_agent", "status"], ), root_search_request_duration_seconds: new_histogram_vec( "root_search_request_duration_seconds", "Duration of root search gRPC requests in seconds.", "search", &[("kind", "server")], - ["status"], + ["user_agent", "status"], duration_buckets(), ), root_search_targeted_splits: new_histogram_vec( @@ -185,7 +185,7 @@ impl Default for SearchMetrics { "Number of splits targeted per root search GRPC request.", "search", &[], - ["status"], + ["user_agent", "status"], targeted_splits_buckets.clone(), ), leaf_search_requests_total: new_counter_vec( diff --git a/quickwit/quickwit-search/src/metrics_trackers.rs b/quickwit/quickwit-search/src/metrics_trackers.rs index 9539ac2e098..4714fd041e7 100644 --- a/quickwit/quickwit-search/src/metrics_trackers.rs +++ b/quickwit/quickwit-search/src/metrics_trackers.rs @@ -35,6 +35,7 @@ pub struct SearchPlanMetricsFuture { pub tracked: F, pub start: Instant, pub is_success: Option, + pub user_agent: String, } #[pinned_drop] @@ -47,7 +48,7 @@ impl PinnedDrop for SearchPlanMetricsFuture { None => "plan-cancelled", }; - let label_values = [status]; + let label_values = [normalize_user_agent(&self.user_agent), status]; SEARCH_METRICS .root_search_requests_total .with_label_values(label_values) @@ -85,13 +86,14 @@ pub struct RootSearchMetricsFuture { pub start: Instant, pub num_targeted_splits: usize, pub status: Option<&'static str>, + pub user_agent: String, } #[pinned_drop] impl PinnedDrop for RootSearchMetricsFuture { fn drop(self: Pin<&mut Self>) { let status = self.status.unwrap_or("cancelled"); - let label_values = [status]; + let label_values = [normalize_user_agent(&self.user_agent), status]; SEARCH_METRICS .root_search_requests_total .with_label_values(label_values) @@ -182,3 +184,39 @@ where F: Future> Poll::Ready(Ok(response?)) } } + +/// Simplify the user agent to limit the metric's cardinality. +pub fn normalize_user_agent(user_agent: &str) -> &str { + let ua = user_agent.trim(); + + // Browsers always start with "Mozilla/" + if ua.starts_with("Mozilla") { + return "browser"; + } + + let lower = ua.to_ascii_lowercase(); + + // Well-known CLI / library prefixes (match on the start of the lower-cased + // string so version numbers don't matter). + const CLI_PREFIXES: &[(&str, &str)] = &[ + ("curl", "curl"), + ("wget", "wget"), + ("python-httpx", "python-httpx"), + ("python-requests", "python-requests"), + ("go-http-client", "go-http-client"), + ("java", "java"), + ("okhttp", "okhttp"), + ("axios", "axios"), + ("ruby", "ruby"), + ("node-fetch", "node-fetch"), + ("node", "node"), + ]; + for (prefix, label) in CLI_PREFIXES { + if lower.starts_with(prefix) { + return label; + } + } + + // Keep short service names verbatim; truncate anything exotic. + if ua.len() <= 64 { ua } else { "other" } +} diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 370a6d442c7..e131c57ea7c 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -407,6 +407,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result< count_hits: quickwit_proto::search::CountHits::Underestimate as i32, ignore_missing_indexes: req.ignore_missing_indexes, split_id: req.split_id.clone(), + user_agent: req.user_agent.clone(), }) } @@ -1263,6 +1264,7 @@ pub async fn root_search( let (split_metadatas, indexes_meta_for_leaf_search) = SearchPlanMetricsFuture { start: start_instant, + user_agent: search_request.user_agent.clone().unwrap_or_default(), tracked: plan_splits_for_root_search(&mut search_request, &mut metastore), is_success: None, } @@ -1274,6 +1276,8 @@ pub async fn root_search( // It would have been nice to add those in the context of the trace span, // but with our current logging setting, it makes logs too verbose. info!( + indexes = ?PrettySample::new(&search_request.index_id_patterns, 5), + user_agent = search_request.user_agent.as_deref().unwrap_or_default(), query_ast = search_request.query_ast.as_str(), agg = search_request.aggregation_request(), start_ts = ?(search_request.start_timestamp()..search_request.end_timestamp()), @@ -1300,6 +1304,7 @@ pub async fn root_search( let mut search_response_result = RootSearchMetricsFuture { start: start_instant, + user_agent: search_request.user_agent.clone().unwrap_or_default(), tracked: root_search_aux( searcher_context, &indexes_meta_for_leaf_search, diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 363065a3403..a30df6519bf 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -31,6 +31,7 @@ itertools = { workspace = true } mime_guess = { workspace = true } once_cell = { workspace = true } percent-encoding = { workspace = true } +pin-project = { workspace = true } pprof = { workspace = true, optional = true } prost = { workspace = true } prost-types = { workspace = true } diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs index b8d2343f666..071f080fe81 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs @@ -14,6 +14,7 @@ use bytes::Bytes; use bytesize::ByteSize; +use http::HeaderValue; use serde::de::DeserializeOwned; use warp::reject::LengthRequired; use warp::{Filter, Rejection}; @@ -165,14 +166,21 @@ pub(crate) fn elastic_resolve_index_filter() } #[utoipa::path(get, tag = "Count", path = "/{index}/_count")] -pub(crate) fn elastic_index_count_filter() --> impl Filter, SearchQueryParamsCount, SearchBody), Error = Rejection> + Clone -{ +pub(crate) fn elastic_index_count_filter() -> impl Filter< + Extract = ( + Vec, + SearchQueryParamsCount, + SearchBody, + Option, + ), + Error = Rejection, +> + Clone { warp::path!("_elastic" / String / "_count") .and_then(extract_index_id_patterns) .and(warp::get().or(warp::post()).unify()) .and(warp::query()) .and(json_or_empty()) + .and(warp::header::optional::("user-agent")) } #[utoipa::path(delete, tag = "Indexes", path = "/{index}")] @@ -222,23 +230,33 @@ pub(crate) fn elastic_cat_indices_filter() } #[utoipa::path(get, tag = "Search", path = "/{index}/_search")] -pub(crate) fn elastic_index_search_filter() --> impl Filter, SearchQueryParams, SearchBody), Error = Rejection> + Clone { +pub(crate) fn elastic_index_search_filter() -> impl Filter< + Extract = ( + Vec, + SearchQueryParams, + SearchBody, + Option, + ), + Error = Rejection, +> + Clone { warp::path!("_elastic" / String / "_search") .and_then(extract_index_id_patterns) .and(warp::get().or(warp::post()).unify()) .and(warp::query()) .and(json_or_empty()) + .and(warp::header::optional::("user-agent")) } #[utoipa::path(post, tag = "Search", path = "/_msearch")] pub(crate) fn elastic_multi_search_filter() --> impl Filter + Clone { +-> impl Filter), Error = Rejection> + Clone +{ warp::path!("_elastic" / "_msearch") .and(warp::body::content_length_limit(BODY_LENGTH_LIMIT.as_u64())) .and(warp::body::bytes()) .and(warp::post()) .and(warp::query()) + .and(warp::header::optional::("user-agent")) } fn merge_scroll_body_params( diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index 0fc234ebd0c..466f575fe51 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -21,6 +21,7 @@ use bytes::Bytes; use elasticsearch_dsl::search::Hit as ElasticHit; use elasticsearch_dsl::{HitsMetadata, ShardStatistics, Source, TotalHits, TotalHitsRelation}; use futures_util::StreamExt; +use http::HeaderValue; use itertools::Itertools; use quickwit_cluster::Cluster; use quickwit_common::truncate_str; @@ -307,6 +308,7 @@ fn build_request_for_es_api( index_id_patterns: Vec, search_params: SearchQueryParams, search_body: SearchBody, + user_agent: Option, ) -> Result<(quickwit_proto::search::SearchRequest, bool), ElasticsearchError> { let default_operator = search_params.default_operator.unwrap_or(BooleanOperand::Or); // The query string, if present, takes priority over what can be in the request @@ -413,6 +415,7 @@ fn build_request_for_es_api( count_hits, ignore_missing_indexes, split_id: None, + user_agent: user_agent.and_then(|h| h.to_str().ok().map(str::to_owned)), }, has_doc_id_field, )) @@ -490,12 +493,13 @@ async fn es_compat_index_count( index_id_patterns: Vec, search_params: SearchQueryParamsCount, search_body: SearchBody, + user_agent: Option, search_service: Arc, ) -> Result { let mut search_params: SearchQueryParams = search_params.into(); search_params.track_total_hits = Some(TrackTotalHits::Track(true)); let (search_request, _append_shard_doc) = - build_request_for_es_api(index_id_patterns, search_params, search_body)?; + build_request_for_es_api(index_id_patterns, search_params, search_body, user_agent)?; let search_response: SearchResponse = search_service.root_search(search_request).await?; let search_response_rest: ElasticsearchCountResponse = ElasticsearchCountResponse { count: search_response.num_hits, @@ -507,6 +511,7 @@ async fn es_compat_index_search( index_id_patterns: Vec, search_params: SearchQueryParams, search_body: SearchBody, + user_agent: Option, search_service: Arc, ) -> Result { if search_params.scroll.is_some() && !search_params.allow_partial_search_results() { @@ -520,7 +525,7 @@ async fn es_compat_index_search( let start_instant = Instant::now(); let allow_partial_search_results = search_params.allow_partial_search_results(); let (search_request, append_shard_doc) = - build_request_for_es_api(index_id_patterns, search_params, search_body)?; + build_request_for_es_api(index_id_patterns, search_params, search_body, user_agent)?; let search_response: SearchResponse = search_service.root_search(search_request).await?; let elapsed = start_instant.elapsed(); let mut search_response_rest: ElasticsearchResponse = convert_to_es_search_response( @@ -810,6 +815,7 @@ fn convert_hit( async fn es_compat_index_multi_search( payload: Bytes, multi_search_params: MultiSearchQueryParams, + user_agent: Option, search_service: Arc, ) -> Result { let mut search_requests = Vec::new(); @@ -864,8 +870,12 @@ async fn es_compat_index_multi_search( if let Some(extra_filters) = &multi_search_params.extra_filters { search_query_params.extra_filters = Some(extra_filters.to_vec()); } - let es_request = - build_request_for_es_api(index_ids_patterns, search_query_params, search_body)?; + let es_request = build_request_for_es_api( + index_ids_patterns, + search_query_params, + search_body, + user_agent.clone(), + )?; search_requests.push(es_request); } diff --git a/quickwit/quickwit-serve/src/metrics.rs b/quickwit/quickwit-serve/src/metrics.rs index c1e4fa24d93..333e407a8ad 100644 --- a/quickwit/quickwit-serve/src/metrics.rs +++ b/quickwit/quickwit-serve/src/metrics.rs @@ -19,8 +19,8 @@ use quickwit_common::metrics::{ }; pub struct ServeMetrics { - pub http_requests_total: IntCounterVec<2>, - pub request_duration_secs: HistogramVec<2>, + pub http_requests_total: IntCounterVec<1>, + pub request_duration_secs: HistogramVec<1>, pub ongoing_requests: IntGaugeVec<1>, pub pending_requests: IntGaugeVec<1>, pub circuit_break_total: IntCounter, @@ -40,14 +40,14 @@ impl Default for ServeMetrics { "Total number of HTTP requests processed.", "", &[], - ["method", "status_code"], + ["status_code"], ), request_duration_secs: new_histogram_vec( "request_duration_secs", "Response time in seconds", "", &[], - ["method", "status_code"], + ["status_code"], // last bucket is 163.84s quickwit_common::metrics::exponential_buckets(0.02, 2.0, 14).unwrap(), ), diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index ae33bb50a08..f8f26430a21 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -13,12 +13,17 @@ // limitations under the License. use std::fmt::Formatter; +use std::future::Future; use std::io; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll, ready}; +use std::time::Instant; use hyper_util::rt::{TokioExecutor, TokioIo}; use hyper_util::server::conn::auto::Builder; use hyper_util::service::TowerToHyperService; +use pin_project::{pin_project, pinned_drop}; use quickwit_common::tower::BoxFutureInfaillible; use quickwit_config::{disable_ingest_v1, enable_ingest_v2}; use quickwit_search::SearchService; @@ -26,12 +31,11 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::TlsAcceptor; use tokio_util::either::Either; -use tower::ServiceBuilder; +use tower::{Layer, Service, ServiceBuilder}; use tower_http::compression::CompressionLayer; use tower_http::compression::predicate::{NotForContentType, Predicate, SizeAbove}; use tower_http::cors::{AllowOrigin, CorsLayer}; use tracing::{error, info}; -use warp::filters::log::Info; use warp::hyper::http::HeaderValue; use warp::hyper::{Method, StatusCode, http}; use warp::{Filter, Rejection, Reply, redirect}; @@ -79,6 +83,111 @@ impl std::fmt::Display for TooManyRequests { } } +/// Tower layer that records HTTP request metrics for every request, including +/// cancelled ones. +#[derive(Clone)] +struct HttpMetricsLayer; + +impl Layer for HttpMetricsLayer { + type Service = HttpMetricsService; + fn layer(&self, inner: S) -> Self::Service { + HttpMetricsService { inner } + } +} + +#[derive(Clone)] +struct HttpMetricsService { + inner: S, +} + +impl Service> for HttpMetricsService +where S: Service< + http::Request, + Response = http::Response, + Error = std::convert::Infallible, + > +{ + type Response = S::Response; + type Error = S::Error; + type Future = HttpMetricsFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + let method = req.method().to_string(); + let path = req.uri().path().to_string(); + let user_agent = req + .headers() + .get(http::header::USER_AGENT) + .and_then(|h| h.to_str().ok()) + .unwrap_or_default() + .to_string(); + HttpMetricsFuture { + inner: self.inner.call(req), + start: Instant::now(), + method, + status: None, + path, + user_agent, + } + } +} + +#[pin_project(PinnedDrop)] +struct HttpMetricsFuture { + #[pin] + inner: F, + start: Instant, + method: String, + path: String, + user_agent: String, + /// `None` while in-flight (including if dropped before completion). + /// `Some(status)` once the response future resolves. + status: Option, +} + +#[pinned_drop] +impl PinnedDrop for HttpMetricsFuture { + fn drop(self: Pin<&mut Self>) { + let status = self.status.as_deref().unwrap_or("cancelled"); + let duration = self.start.elapsed(); + info!( + method = self.method, + path = self.path, + status = status, + elapsed_ms = duration.as_millis(), + ua = self.user_agent, + "request finished" + ); + crate::SERVE_METRICS + .http_requests_total + .with_label_values([status]) + .inc(); + crate::SERVE_METRICS + .request_duration_secs + .with_label_values([status]) + .observe(duration.as_secs_f64()); + } +} + +impl Future for HttpMetricsFuture +where F: Future, std::convert::Infallible>> +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let result = ready!(this.inner.poll(cx)); + *this.status = Some(match &result { + Ok(response) => response.status().as_str().to_owned(), + Err(infallible) => match *infallible {}, + }); + Poll::Ready(result) + } +} + /// Env variable key to define the minimum size above which a response should be compressed. /// If unset, no compression is applied. const QW_MINIMUM_COMPRESSION_SIZE_KEY: &str = "QW_MINIMUM_COMPRESSION_SIZE"; @@ -133,19 +242,6 @@ pub(crate) async fn start_rest_server( readiness_trigger: BoxFutureInfaillible<()>, shutdown_signal: BoxFutureInfaillible<()>, ) -> anyhow::Result<()> { - let request_counter = warp::log::custom(|info: Info| { - let elapsed = info.elapsed(); - let status = info.status(); - let label_values: [&str; 2] = [info.method().as_str(), status.as_str()]; - crate::SERVE_METRICS - .request_duration_secs - .with_label_values(label_values) - .observe(elapsed.as_secs_f64()); - crate::SERVE_METRICS - .http_requests_total - .with_label_values(label_values) - .inc(); - }); // Docs routes let api_doc = warp::path("openapi.json") .and(warp::get()) @@ -200,7 +296,6 @@ pub(crate) async fn start_rest_server( .or(health_check_routes) .or(metrics_routes) .or(developer_routes) - .with(request_counter) .recover(recover_fn_final) .with(extra_headers) .boxed(); @@ -210,6 +305,7 @@ pub(crate) async fn start_rest_server( let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins); let service = ServiceBuilder::new() + .layer(HttpMetricsLayer) .layer( CompressionLayer::new() .zstd(true) diff --git a/quickwit/quickwit-serve/src/search_api/rest_handler.rs b/quickwit/quickwit-serve/src/search_api/rest_handler.rs index 671d7a6c2fa..ec957bfb007 100644 --- a/quickwit/quickwit-serve/src/search_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/search_api/rest_handler.rs @@ -15,6 +15,7 @@ use std::convert::TryFrom; use std::sync::Arc; +use http::HeaderValue; use percent_encoding::percent_decode_str; use quickwit_config::validate_index_id_pattern; use quickwit_proto::search::{CountHits, SortField, SortOrder}; @@ -246,6 +247,7 @@ mod count_hits_from_bool { pub fn search_request_from_api_request( index_id_patterns: Vec, search_request: SearchRequestQueryString, + user_agent: Option, ) -> Result { // The query ast below may still contain user input query. The actual // parsing of the user query will happen in the root service, and might require @@ -269,6 +271,7 @@ pub fn search_request_from_api_request( count_hits: search_request.count_all.into(), ignore_missing_indexes: false, split_id: search_request.split_id, + user_agent, }; Ok(search_request) } @@ -276,10 +279,12 @@ pub fn search_request_from_api_request( async fn search_endpoint( index_id_patterns: Vec, search_request: SearchRequestQueryString, + user_agent: Option, search_service: &dyn SearchService, ) -> Result { let allow_failed_splits = search_request.allow_failed_splits; - let search_request = search_request_from_api_request(index_id_patterns, search_request)?; + let search_request = + search_request_from_api_request(index_id_patterns, search_request, user_agent)?; let search_response = search_service .root_search(search_request) @@ -298,20 +303,24 @@ async fn search_endpoint( } fn search_get_filter() --> impl Filter, SearchRequestQueryString), Error = Rejection> + Clone { +-> impl Filter, SearchRequestQueryString, Option), Error = Rejection> ++ Clone { warp::path!(String / "search") .and_then(extract_index_id_patterns) .and(warp::get()) .and(warp::query()) + .and(warp::header::optional::("user-agent")) } fn search_post_filter() --> impl Filter, SearchRequestQueryString), Error = Rejection> + Clone { +-> impl Filter, SearchRequestQueryString, Option), Error = Rejection> ++ Clone { warp::path!(String / "search") .and_then(extract_index_id_patterns) .and(warp::post()) .and(warp::body::content_length_limit(1024 * 1024)) .and(warp::body::json()) + .and(warp::header::optional::("user-agent")) } fn search_plan_get_filter() @@ -334,11 +343,18 @@ fn search_plan_post_filter() async fn search( index_id_patterns: Vec, search_request: SearchRequestQueryString, + user_agent: Option, search_service: Arc, ) -> impl warp::Reply { info!(request =? search_request, "search"); let body_format = search_request.format; - let result = search_endpoint(index_id_patterns, search_request, &*search_service).await; + let result = search_endpoint( + index_id_patterns, + search_request, + user_agent.and_then(|h| h.to_str().ok().map(str::to_owned)), + &*search_service, + ) + .await; into_rest_api_response(result, body_format) } @@ -349,7 +365,8 @@ async fn search_plan( ) -> impl warp::Reply { let body_format = search_request.format; let result: Result = async { - let plan_request = search_request_from_api_request(index_id_patterns, search_request)?; + let plan_request = + search_request_from_api_request(index_id_patterns, search_request, None)?; let plan_response = search_service.search_plan(plan_request).await?; let response = serde_json::from_str(&plan_response.result)?; Ok(response) @@ -522,7 +539,7 @@ mod tests { #[tokio::test] async fn test_rest_search_api_route_post() { let rest_search_api_filter = search_post_filter(); - let (indexes, req) = warp::test::request() + let (indexes, req, _) = warp::test::request() .method("POST") .path("/quickwit-demo-index/search") .json(&true) @@ -550,7 +567,7 @@ mod tests { #[tokio::test] async fn test_rest_search_api_route_post_multi_indexes() { let rest_search_api_filter = search_post_filter(); - let (indexes, req) = warp::test::request() + let (indexes, req, _) = warp::test::request() .method("POST") .path("/quickwit-demo-index,quickwit-demo,quickwit-demo-index-*/search") .json(&true) @@ -605,7 +622,7 @@ mod tests { #[tokio::test] async fn test_rest_search_api_route_simple() { let rest_search_api_filter = search_get_filter(); - let (indexes, req) = warp::test::request() + let (indexes, req, _) = warp::test::request() .path( "/quickwit-demo-index/search?query=*&end_timestamp=1450720000&max_hits=10&\ start_offset=22", @@ -633,7 +650,7 @@ mod tests { #[tokio::test] async fn test_rest_search_api_route_count_all() { let rest_search_api_filter = search_get_filter(); - let (indexes, req) = warp::test::request() + let (indexes, req, _) = warp::test::request() .path("/quickwit-demo-index/search?query=*&count_all=true") .filter(&rest_search_api_filter) .await @@ -651,7 +668,7 @@ mod tests { } ); let rest_search_api_filter = search_get_filter(); - let (indexes, req) = warp::test::request() + let (indexes, req, _) = warp::test::request() .path("/quickwit-demo-index/search?query=*&count_all=false") .filter(&rest_search_api_filter) .await @@ -673,7 +690,7 @@ mod tests { #[tokio::test] async fn test_rest_search_api_route_simple_default_num_hits_default_offset() { let rest_search_api_filter = search_get_filter(); - let (indexes, req) = warp::test::request() + let (indexes, req, _) = warp::test::request() .path( "/quickwit-demo-index/search?query=*&end_timestamp=1450720000&search_field=title,\ body", @@ -701,7 +718,7 @@ mod tests { #[tokio::test] async fn test_rest_search_api_route_simple_format() { let rest_search_api_filter = search_get_filter(); - let (indexes, req) = warp::test::request() + let (indexes, req, _) = warp::test::request() .path("/quickwit-demo-index/search?query=*&format=json") .filter(&rest_search_api_filter) .await @@ -826,7 +843,7 @@ mod tests { "/quickwit-demo-index/search?query=*&format=json&sort_by={sort_by_query_param}" ); let rest_search_api_filter = search_get_filter(); - let (_, req) = warp::test::request() + let (_, req, _) = warp::test::request() .path(&path) .filter(&rest_search_api_filter) .await @@ -840,7 +857,7 @@ mod tests { } let rest_search_api_filter = search_get_filter(); - let (_, req) = warp::test::request() + let (_, req, _) = warp::test::request() .path("/quickwit-demo-index/search?query=*&format=json&sort_by_field=fiel1") .filter(&rest_search_api_filter) .await