Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repository = "https://github.com/sundy-li/arrow_cli"
edition = "2024"
license = "Apache-2.0"
name = "arrow_cli"
version = "0.3.0"
version = "0.3.1"


[dependencies]
Expand Down
7 changes: 4 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ struct Args {
help = "Execute query using prepared statement"
)]
prepared: bool,

#[clap(long, default_value = "false", help = "Print resultset schema")]
print_schema: bool,
}

#[tokio::main]
Expand All @@ -50,9 +53,7 @@ pub async fn main() -> Result<(), ArrowError> {
let url = format!("{protocol}://{}:{}", args.host, args.port);
let endpoint = endpoint(&args, url)?;
let is_repl = atty::is(Stream::Stdin);
let mut session =
session::Session::try_new(endpoint, &args.user, &args.password, is_repl, args.prepared)
.await?;
let mut session = session::Session::try_new(endpoint, is_repl, args).await?;

session.handle().await;
Ok(())
Expand Down
29 changes: 15 additions & 14 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,20 @@ use std::io::BufRead;
use tokio::time::Instant;
use tonic::transport::{Channel, Endpoint};

use crate::helper::CliHelper;
use crate::{Args, helper::CliHelper};

pub struct Session {
client: FlightSqlServiceClient<Channel>,
is_repl: bool,
prompt: String,
prepared: bool,
args: Args,
}

impl Session {
pub async fn try_new(
endpoint: Endpoint,
user: &str,
password: &str,
is_repl: bool,
prepared: bool,
args: Args,
) -> Result<Self, ArrowError> {
let channel = endpoint
.connect()
Expand All @@ -37,21 +35,21 @@ impl Session {

if is_repl {
println!("Welcome to Arrow CLI v{}.", env!("CARGO_PKG_VERSION"));
println!("Connecting to {} as user {}.", endpoint.uri(), user);
println!("Connecting to {} as user {}.", endpoint.uri(), args.user);
println!();
}

let mut client = FlightSqlServiceClient::new_from_inner(
FlightServiceClient::new(channel).max_decoding_message_size(usize::MAX),
);
let _token = client.handshake(user, password).await?;
let _token = client.handshake(&args.user, &args.password).await?;

let prompt = format!("{} :) ", endpoint.uri().host().unwrap());
Ok(Self {
client,
is_repl,
prompt,
prepared,
args,
})
}

Expand Down Expand Up @@ -130,7 +128,7 @@ impl Session {
}

let start = Instant::now();
let flight_info = if self.prepared {
let flight_info = if self.args.prepared {
let mut stmt = self.client.prepare(query.to_string(), None).await?;
let info = stmt.execute().await?;
stmt.close().await?;
Expand All @@ -142,7 +140,7 @@ impl Session {
let mut batches: Vec<RecordBatch> = Vec::new();

let mut handles = Vec::with_capacity(flight_info.endpoint.len());
for endpoint in flight_info.endpoint {
for endpoint in flight_info.endpoint.iter() {
let ticket = endpoint
.ticket
.as_ref()
Expand All @@ -166,17 +164,20 @@ impl Session {
if is_repl {
let res = pretty_format_batches(batches.as_slice())?;

println!("{res}");
println!();
println!("{res}\n");

if self.args.print_schema {
let schema = flight_info.try_decode_schema()?;
println!("{schema:#?}\n");
}

let rows: usize = batches.iter().map(|b| b.num_rows()).sum();
println!(
"{} rows in set (tickets received in {:.3} sec, rows received in {:.3} sec)",
"{} rows in set (tickets received in {:.3} sec, rows received in {:.3} sec)\n",
rows,
ticket_recv_duration.as_secs_f64(),
rows_recv_duration.as_secs_f64(),
);
println!();
} else {
let res = print_batches_with_sep(batches.as_slice(), b'\t')?;
print!("{res}");
Expand Down