Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ dist
*.rpm
# intellij
.idea/
*.pftrace
104 changes: 103 additions & 1 deletion src/bin.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use anyhow::{Result, anyhow};
use backtrace::Backtrace;
use chrono::TimeDelta;
use flexi_logger::{FileSpec, LogSpecification, Logger};
use std::ffi::OsString;
use std::panic::{self, PanicHookInfo};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use cursive::view::Resizable;

use crate::{
interpreter::{ClickHouse, Context, ContextArc, options},
interpreter::{
ClickHouse, Context, ContextArc, Query, fetch_and_populate_perfetto_trace,
fetch_server_perfetto_sources, options, perfetto::PerfettoTraceBuilder,
},
view::Navigation,
};

Expand Down Expand Up @@ -39,6 +44,98 @@ fn panic_hook(info: &PanicHookInfo<'_>) {
);
}

fn derive_output_path(
user_path: Option<&Path>,
is_server_scope: bool,
query_id: Option<&str>,
) -> PathBuf {
if let Some(p) = user_path {
return p.to_path_buf();
}
if is_server_scope {
PathBuf::from("server_perfetto_trace.pftrace")
} else {
// query_id presence is guaranteed by the !is_server_scope branch in the caller
PathBuf::from(format!("{}.pftrace", query_id.unwrap()))
}
}

async fn run_cli_perfetto_export(
options: &options::ChDigOptions,
clickhouse: &Arc<ClickHouse>,
) -> Result<()> {
let cmd = options
.perfetto_command()
.expect("run_cli_perfetto_export requires the perfetto export subcommand");

let perfetto_options = cmd.apply(options.perfetto.clone());

let is_server_scope = options.view.query_id.is_none();
let view_start = options.view.start.clone().into();
let view_end = options.view.end.clone().into();
let mut scope = match &options.view.query_id {
Some(query_id) => {
clickhouse
.get_perfetto_query_scope(query_id, view_start, view_end)
.await?
}
None => crate::interpreter::clickhouse::PerfettoQueryScope {
start: view_start,
end: view_end,
query_ids: None,
},
};
// Match TUI behavior: include events that arrived in the same second as the query end.
scope.end += TimeDelta::seconds(1);

let query_block = clickhouse
.get_queries_for_perfetto(scope.start, scope.end, &scope.query_ids)
.await?;
let mut queries = Vec::new();
for i in 0..query_block.row_count() {
match Query::from_clickhouse_block(&query_block, i, false) {
Ok(q) => queries.push(q),
Err(e) => log::warn!("Perfetto: failed to parse query row {}: {}", i, e),
}
}

let mut builder = PerfettoTraceBuilder::new(
perfetto_options.per_server,
perfetto_options.text_log_android,
);
builder.add_queries(&queries);
fetch_and_populate_perfetto_trace(
clickhouse,
&mut builder,
&perfetto_options,
scope.query_ids.as_deref(),
scope.start,
scope.end,
)
.await;

if is_server_scope {
fetch_server_perfetto_sources(
clickhouse,
&mut builder,
&perfetto_options,
scope.start,
scope.end,
)
.await;
}

let output = derive_output_path(
options.view.output.as_deref(),
is_server_scope,
options.view.query_id.as_deref(),
);

std::fs::write(&output, builder.build())?;
println!("Perfetto trace exported to {}", output.display());
Ok(())
}

pub async fn chdig_main_async<I, T>(itr: I) -> Result<()>
where
I: IntoIterator<Item = T>,
Expand All @@ -61,6 +158,11 @@ where
// panic hook will clear the screen).
let clickhouse = Arc::new(ClickHouse::new(options.clickhouse.clone()).await?);

if options.perfetto_command().is_some() {
run_cli_perfetto_export(&options, &clickhouse).await?;
return Ok(());
}

let server_warnings = match clickhouse.get_warnings().await {
Ok(w) => w,
Err(e) => {
Expand Down
63 changes: 63 additions & 0 deletions src/interpreter/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ pub struct TextLogArguments {
pub end: RelativeDateTime,
}

#[derive(Debug, Clone)]
pub struct PerfettoQueryScope {
pub query_ids: Option<Vec<String>>,
pub start: DateTime<Local>,
pub end: DateTime<Local>,
}

#[derive(Default)]
pub struct ClickHouseServerCPU {
pub count: u64,
Expand Down Expand Up @@ -1589,6 +1596,7 @@ impl ClickHouse {
&self,
start: DateTime<Local>,
end: DateTime<Local>,
query_ids: &Option<Vec<String>>,
) -> Result<Columns> {
let dbtable = self.get_log_table_name("system", "query_log");
return self
Expand Down Expand Up @@ -1620,6 +1628,7 @@ impl ClickHouse {
WHERE type != 'QueryStart'
AND event_date >= toDate(start_) AND event_time >= toDateTime(start_)
AND event_date <= toDate(end_) AND event_time <= toDateTime(end_)
{query_ids}
"#,
start = start
.timestamp_nanos_opt()
Expand All @@ -1634,12 +1643,66 @@ impl ClickHouse {
} else {
"length(thread_ids)"
},
query_ids = if let Some(query_id) = query_ids {
format!("AND query_id IN ('{}')", query_id.join("','"))
} else {
String::new()
},
)
.as_str(),
)
.await;
}

pub async fn get_perfetto_query_scope(
&self,
query_id: &str,
start: DateTime<Local>,
end: DateTime<Local>,
) -> Result<PerfettoQueryScope> {
let query_log = self.get_log_table_name("system", "query_log");
let query_id_escaped = query_id.replace('\'', "''");
let block = self
.execute(
format!(
r#"
WITH
'{query_id}' AS root_query_id,
fromUnixTimestamp64Nano({start}) AS start_,
fromUnixTimestamp64Nano({end}) AS end_
SELECT
groupUniqArray(query_id) AS query_ids,
min(query_start_time_microseconds) AS query_start_time_microseconds,
max(event_time_microseconds) AS query_end_time_microseconds
FROM {query_log}
WHERE
type != 'QueryStart'
AND (query_id = root_query_id OR initial_query_id = root_query_id)
AND event_date >= toDate(start_) AND event_time >= toDateTime(start_)
AND event_date <= toDate(end_) AND event_time <= toDateTime(end_)
"#,
query_id = query_id_escaped,
query_log = query_log,
start = start
.timestamp_nanos_opt()
.ok_or(Error::msg("Invalid start"))?,
end = end.timestamp_nanos_opt().ok_or(Error::msg("Invalid end"))?,
)
.as_str(),
)
.await?;

return Ok(PerfettoQueryScope {
query_ids: Some(block.get::<Vec<String>, _>(0, "query_ids")?),
start: block
.get::<DateTime<Tz>, _>(0, "query_start_time_microseconds")?
.with_timezone(&Local),
end: block
.get::<DateTime<Tz>, _>(0, "query_end_time_microseconds")?
.with_timezone(&Local),
});
}

pub async fn get_metric_log_for_perfetto(
&self,
start: DateTime<Local>,
Expand Down
1 change: 1 addition & 0 deletions src/interpreter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use clickhouse_quirks::ClickHouseQuirks;
pub use context::Context;
pub use context::ContextArc;
pub use worker::Worker;
pub(crate) use worker::{fetch_and_populate_perfetto_trace, fetch_server_perfetto_sources};

pub type WorkerEvent = worker::Event;
pub type Query = query::Query;
Expand Down
Loading
Loading