Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> {
split_id: None,
};
let search_request =
search_request_from_api_request(vec![args.index_id], search_request_query_string)?;
search_request_from_api_request(vec![args.index_id], search_request_query_string, None)?;
debug!(search_request=?search_request, "search-request");
let search_response: SearchResponse =
single_node_search(search_request, metastore, storage_resolver).await?;
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ message SearchRequest {
bool ignore_missing_indexes = 18;

optional string split_id = 19;

// The user agent of the client that initiated the search request.
optional string user_agent = 20;
}

enum CountHits {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion quickwit/quickwit-rest-client/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use quickwit_proto::ingest::Shard;
use quickwit_serve::{
ListSplitsQueryParams, ListSplitsResponse, RestIngestResponse, SearchRequestQueryString,
};
use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue};
use reqwest::header::{CONTENT_TYPE, HeaderMap, HeaderValue, USER_AGENT};
use reqwest::tls::Certificate;
use reqwest::{ClientBuilder as ReqwestClientBuilder, Method, StatusCode, Url};
use reqwest_middleware::{ClientBuilder as ReqwestMiddlewareClientBuilder, ClientWithMiddleware};
Expand Down Expand Up @@ -112,6 +112,7 @@ impl Transport {
}
let mut request_headers = HeaderMap::new();
request_headers.insert(CONTENT_TYPE, HeaderValue::from_static(DEFAULT_CONTENT_TYPE));
request_headers.insert(USER_AGENT, HeaderValue::from_static("qw-rest-client"));
Comment thread
rdettai-sk marked this conversation as resolved.
if let Some(header_map_val) = header_map {
request_headers.extend(header_map_val.into_iter());
}
Expand Down
12 changes: 6 additions & 6 deletions quickwit/quickwit-search/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ impl SplitSearchOutcomeCounters {
}

pub struct SearchMetrics {
pub root_search_requests_total: IntCounterVec<1>,
pub root_search_request_duration_seconds: HistogramVec<1>,
pub root_search_targeted_splits: HistogramVec<1>,
pub root_search_requests_total: IntCounterVec<2>,
pub root_search_request_duration_seconds: HistogramVec<2>,
pub root_search_targeted_splits: HistogramVec<2>,
pub leaf_search_requests_total: IntCounterVec<2>,
pub leaf_search_request_duration_seconds: HistogramVec<2>,
pub leaf_search_targeted_splits: HistogramVec<2>,
Expand Down Expand Up @@ -170,22 +170,22 @@ impl Default for SearchMetrics {
"Total number of root search gRPC requests processed.",
"search",
&[("kind", "server")],
["status"],
["user_agent", "status"],
),
root_search_request_duration_seconds: new_histogram_vec(
"root_search_request_duration_seconds",
"Duration of root search gRPC requests in seconds.",
"search",
&[("kind", "server")],
["status"],
["user_agent", "status"],
duration_buckets(),
),
root_search_targeted_splits: new_histogram_vec(
"root_search_targeted_splits",
"Number of splits targeted per root search GRPC request.",
"search",
&[],
["status"],
["user_agent", "status"],
targeted_splits_buckets.clone(),
),
leaf_search_requests_total: new_counter_vec(
Expand Down
42 changes: 40 additions & 2 deletions quickwit/quickwit-search/src/metrics_trackers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct SearchPlanMetricsFuture<F> {
pub tracked: F,
pub start: Instant,
pub is_success: Option<bool>,
pub user_agent: String,
}

#[pinned_drop]
Expand All @@ -47,7 +48,7 @@ impl<F> PinnedDrop for SearchPlanMetricsFuture<F> {
None => "plan-cancelled",
};

let label_values = [status];
let label_values = [normalize_user_agent(&self.user_agent), status];
SEARCH_METRICS
.root_search_requests_total
.with_label_values(label_values)
Expand Down Expand Up @@ -85,13 +86,14 @@ pub struct RootSearchMetricsFuture<F> {
pub start: Instant,
pub num_targeted_splits: usize,
pub status: Option<&'static str>,
pub user_agent: String,
}

#[pinned_drop]
impl<F> PinnedDrop for RootSearchMetricsFuture<F> {
fn drop(self: Pin<&mut Self>) {
let status = self.status.unwrap_or("cancelled");
let label_values = [status];
let label_values = [normalize_user_agent(&self.user_agent), status];
SEARCH_METRICS
.root_search_requests_total
.with_label_values(label_values)
Expand Down Expand Up @@ -182,3 +184,39 @@ where F: Future<Output = Result<LeafSearchResponse, SearchError>>
Poll::Ready(Ok(response?))
}
}

/// Simplify the user agent to limit the metric's cardinality.
pub fn normalize_user_agent(user_agent: &str) -> &str {
let ua = user_agent.trim();

// Browsers always start with "Mozilla/"
if ua.starts_with("Mozilla") {
return "browser";
}

let lower = ua.to_ascii_lowercase();

// Well-known CLI / library prefixes (match on the start of the lower-cased
// string so version numbers don't matter).
const CLI_PREFIXES: &[(&str, &str)] = &[
("curl", "curl"),
("wget", "wget"),
("python-httpx", "python-httpx"),
("python-requests", "python-requests"),
("go-http-client", "go-http-client"),
("java", "java"),
("okhttp", "okhttp"),
("axios", "axios"),
("ruby", "ruby"),
("node-fetch", "node-fetch"),
("node", "node"),
];
for (prefix, label) in CLI_PREFIXES {
if lower.starts_with(prefix) {
return label;
}
}

// Keep short service names verbatim; truncate anything exotic.
if ua.len() <= 64 { ua } else { "other" }
}
Comment thread
rdettai-sk marked this conversation as resolved.
5 changes: 5 additions & 0 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ fn simplify_search_request_for_scroll_api(req: &SearchRequest) -> crate::Result<
count_hits: quickwit_proto::search::CountHits::Underestimate as i32,
ignore_missing_indexes: req.ignore_missing_indexes,
split_id: req.split_id.clone(),
user_agent: req.user_agent.clone(),
})
}

Expand Down Expand Up @@ -1263,6 +1264,7 @@ pub async fn root_search(

let (split_metadatas, indexes_meta_for_leaf_search) = SearchPlanMetricsFuture {
start: start_instant,
user_agent: search_request.user_agent.clone().unwrap_or_default(),
tracked: plan_splits_for_root_search(&mut search_request, &mut metastore),
is_success: None,
}
Expand All @@ -1274,6 +1276,8 @@ pub async fn root_search(
// It would have been nice to add those in the context of the trace span,
// but with our current logging setting, it makes logs too verbose.
info!(
indexes = ?PrettySample::new(&search_request.index_id_patterns, 5),
user_agent = search_request.user_agent.as_deref().unwrap_or_default(),
query_ast = search_request.query_ast.as_str(),
agg = search_request.aggregation_request(),
Comment thread
rdettai-sk marked this conversation as resolved.
start_ts = ?(search_request.start_timestamp()..search_request.end_timestamp()),
Expand All @@ -1300,6 +1304,7 @@ pub async fn root_search(

let mut search_response_result = RootSearchMetricsFuture {
start: start_instant,
user_agent: search_request.user_agent.clone().unwrap_or_default(),
tracked: root_search_aux(
searcher_context,
&indexes_meta_for_leaf_search,
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ itertools = { workspace = true }
mime_guess = { workspace = true }
once_cell = { workspace = true }
percent-encoding = { workspace = true }
pin-project = { workspace = true }
pprof = { workspace = true, optional = true }
prost = { workspace = true }
prost-types = { workspace = true }
Expand Down
30 changes: 24 additions & 6 deletions quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use bytes::Bytes;
use bytesize::ByteSize;
use http::HeaderValue;
use serde::de::DeserializeOwned;
use warp::reject::LengthRequired;
use warp::{Filter, Rejection};
Expand Down Expand Up @@ -165,14 +166,21 @@ pub(crate) fn elastic_resolve_index_filter()
}

#[utoipa::path(get, tag = "Count", path = "/{index}/_count")]
pub(crate) fn elastic_index_count_filter()
-> impl Filter<Extract = (Vec<String>, SearchQueryParamsCount, SearchBody), Error = Rejection> + Clone
{
pub(crate) fn elastic_index_count_filter() -> impl Filter<
Extract = (
Vec<String>,
SearchQueryParamsCount,
SearchBody,
Option<HeaderValue>,
),
Error = Rejection,
> + Clone {
warp::path!("_elastic" / String / "_count")
.and_then(extract_index_id_patterns)
.and(warp::get().or(warp::post()).unify())
.and(warp::query())
.and(json_or_empty())
.and(warp::header::optional::<HeaderValue>("user-agent"))
}

#[utoipa::path(delete, tag = "Indexes", path = "/{index}")]
Expand Down Expand Up @@ -222,23 +230,33 @@ pub(crate) fn elastic_cat_indices_filter()
}

#[utoipa::path(get, tag = "Search", path = "/{index}/_search")]
pub(crate) fn elastic_index_search_filter()
-> impl Filter<Extract = (Vec<String>, SearchQueryParams, SearchBody), Error = Rejection> + Clone {
pub(crate) fn elastic_index_search_filter() -> impl Filter<
Extract = (
Vec<String>,
SearchQueryParams,
SearchBody,
Option<HeaderValue>,
),
Error = Rejection,
> + Clone {
warp::path!("_elastic" / String / "_search")
.and_then(extract_index_id_patterns)
.and(warp::get().or(warp::post()).unify())
.and(warp::query())
.and(json_or_empty())
.and(warp::header::optional::<HeaderValue>("user-agent"))
}
Comment thread
rdettai-sk marked this conversation as resolved.

#[utoipa::path(post, tag = "Search", path = "/_msearch")]
pub(crate) fn elastic_multi_search_filter()
-> impl Filter<Extract = (Bytes, MultiSearchQueryParams), Error = Rejection> + Clone {
-> impl Filter<Extract = (Bytes, MultiSearchQueryParams, Option<HeaderValue>), Error = Rejection> + Clone
{
warp::path!("_elastic" / "_msearch")
.and(warp::body::content_length_limit(BODY_LENGTH_LIMIT.as_u64()))
.and(warp::body::bytes())
.and(warp::post())
.and(warp::query())
.and(warp::header::optional::<HeaderValue>("user-agent"))
}
Comment thread
rdettai-sk marked this conversation as resolved.

fn merge_scroll_body_params(
Expand Down
18 changes: 14 additions & 4 deletions quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use bytes::Bytes;
use elasticsearch_dsl::search::Hit as ElasticHit;
use elasticsearch_dsl::{HitsMetadata, ShardStatistics, Source, TotalHits, TotalHitsRelation};
use futures_util::StreamExt;
use http::HeaderValue;
use itertools::Itertools;
use quickwit_cluster::Cluster;
use quickwit_common::truncate_str;
Expand Down Expand Up @@ -307,6 +308,7 @@ fn build_request_for_es_api(
index_id_patterns: Vec<String>,
search_params: SearchQueryParams,
search_body: SearchBody,
user_agent: Option<HeaderValue>,
) -> Result<(quickwit_proto::search::SearchRequest, bool), ElasticsearchError> {
let default_operator = search_params.default_operator.unwrap_or(BooleanOperand::Or);
// The query string, if present, takes priority over what can be in the request
Expand Down Expand Up @@ -413,6 +415,7 @@ fn build_request_for_es_api(
count_hits,
ignore_missing_indexes,
split_id: None,
user_agent: user_agent.and_then(|h| h.to_str().ok().map(str::to_owned)),
},
has_doc_id_field,
))
Expand Down Expand Up @@ -490,12 +493,13 @@ async fn es_compat_index_count(
index_id_patterns: Vec<String>,
search_params: SearchQueryParamsCount,
search_body: SearchBody,
user_agent: Option<HeaderValue>,
search_service: Arc<dyn SearchService>,
) -> Result<ElasticsearchCountResponse, ElasticsearchError> {
let mut search_params: SearchQueryParams = search_params.into();
search_params.track_total_hits = Some(TrackTotalHits::Track(true));
let (search_request, _append_shard_doc) =
build_request_for_es_api(index_id_patterns, search_params, search_body)?;
build_request_for_es_api(index_id_patterns, search_params, search_body, user_agent)?;
let search_response: SearchResponse = search_service.root_search(search_request).await?;
let search_response_rest: ElasticsearchCountResponse = ElasticsearchCountResponse {
count: search_response.num_hits,
Expand All @@ -507,6 +511,7 @@ async fn es_compat_index_search(
index_id_patterns: Vec<String>,
search_params: SearchQueryParams,
search_body: SearchBody,
user_agent: Option<HeaderValue>,
search_service: Arc<dyn SearchService>,
) -> Result<ElasticsearchResponse, ElasticsearchError> {
if search_params.scroll.is_some() && !search_params.allow_partial_search_results() {
Expand All @@ -520,7 +525,7 @@ async fn es_compat_index_search(
let start_instant = Instant::now();
let allow_partial_search_results = search_params.allow_partial_search_results();
let (search_request, append_shard_doc) =
build_request_for_es_api(index_id_patterns, search_params, search_body)?;
build_request_for_es_api(index_id_patterns, search_params, search_body, user_agent)?;
let search_response: SearchResponse = search_service.root_search(search_request).await?;
let elapsed = start_instant.elapsed();
let mut search_response_rest: ElasticsearchResponse = convert_to_es_search_response(
Expand Down Expand Up @@ -810,6 +815,7 @@ fn convert_hit(
async fn es_compat_index_multi_search(
payload: Bytes,
multi_search_params: MultiSearchQueryParams,
user_agent: Option<HeaderValue>,
search_service: Arc<dyn SearchService>,
) -> Result<MultiSearchResponse, ElasticsearchError> {
let mut search_requests = Vec::new();
Expand Down Expand Up @@ -864,8 +870,12 @@ async fn es_compat_index_multi_search(
if let Some(extra_filters) = &multi_search_params.extra_filters {
search_query_params.extra_filters = Some(extra_filters.to_vec());
}
let es_request =
build_request_for_es_api(index_ids_patterns, search_query_params, search_body)?;
let es_request = build_request_for_es_api(
index_ids_patterns,
search_query_params,
search_body,
user_agent.clone(),
)?;
search_requests.push(es_request);
}

Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-serve/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use quickwit_common::metrics::{
};

pub struct ServeMetrics {
pub http_requests_total: IntCounterVec<2>,
pub request_duration_secs: HistogramVec<2>,
pub http_requests_total: IntCounterVec<1>,
pub request_duration_secs: HistogramVec<1>,
pub ongoing_requests: IntGaugeVec<1>,
pub pending_requests: IntGaugeVec<1>,
pub circuit_break_total: IntCounter,
Expand All @@ -40,14 +40,14 @@ impl Default for ServeMetrics {
"Total number of HTTP requests processed.",
"",
&[],
["method", "status_code"],
["status_code"],
),
request_duration_secs: new_histogram_vec(
"request_duration_secs",
"Response time in seconds",
"",
&[],
["method", "status_code"],
["status_code"],
// last bucket is 163.84s
quickwit_common::metrics::exponential_buckets(0.02, 2.0, 14).unwrap(),
),
Expand Down
Loading
Loading