diff --git a/crates/connect/src/session.rs b/crates/connect/src/session.rs index 94184a7..5e06160 100644 --- a/crates/connect/src/session.rs +++ b/crates/connect/src/session.rs @@ -63,7 +63,8 @@ impl Default for SparkSessionBuilder { impl SparkSessionBuilder { fn new(connection: &str) -> Self { - let channel_builder = ChannelBuilder::create(connection).unwrap(); + let channel_builder = ChannelBuilder::create(connection) + .expect("Failed to parse the Spark Connect connection string"); Self { channel_builder, @@ -207,9 +208,17 @@ impl SparkSession { .execute_command_and_fetch(plan) .await?; - let relation = resp.sql_command_result.to_owned().unwrap().relation; - - let logical_plan = LogicalPlanBuilder::new(relation.unwrap()); + let relation = resp + .sql_command_result + .to_owned() + .ok_or(SparkError::AnalysisException( + "SQL command result is empty".to_string(), + ))? + .relation; + + let logical_plan = LogicalPlanBuilder::new(relation.ok_or( + SparkError::AnalysisException("SQL relation result is empty".to_string()), + )?); Ok(DataFrame::new(self.session(), logical_plan)) } diff --git a/crates/connect/src/streaming/mod.rs b/crates/connect/src/streaming/mod.rs index 5feb4ce..7249f80 100644 --- a/crates/connect/src/streaming/mod.rs +++ b/crates/connect/src/streaming/mod.rs @@ -233,7 +233,14 @@ impl DataStreamWriter { format: self.format.unwrap_or("".to_string()), options: self.write_options, partitioning_column_names: self.partition_by, - output_mode: self.output_mode.unwrap().as_str_name().to_string(), + output_mode: self + .output_mode + .as_ref() + .ok_or(SparkError::AnalysisException( + "output_mode is not set".to_string(), + ))? + .as_str_name() + .to_string(), query_name: self.query_name.unwrap_or("".to_string()), foreach_batch: None, foreach_writer: None, @@ -252,10 +259,11 @@ impl DataStreamWriter { .await? .write_stream_operation_start_result; - Ok(StreamingQuery::new( - self.dataframe.spark_session, - operation_start_resp.unwrap(), - )) + let operation_start_resp = operation_start_resp.ok_or(SparkError::AnalysisException( + "write stream operation start result is empty".to_string(), + ))?; + + StreamingQuery::new(self.dataframe.spark_session, operation_start_resp) } /// Start a streaming job to save the contents of the [StreamingQuery] to a data source. @@ -293,14 +301,16 @@ impl StreamingQuery { pub fn new( spark_session: Box, write_stream: spark::WriteStreamOperationStartResult, - ) -> Self { - let query_instance = write_stream.query_id.unwrap(); + ) -> Result { + let query_instance = write_stream.query_id.ok_or(SparkError::AnalysisException( + "streaming query id is empty".to_string(), + ))?; - Self { + Ok(Self { spark_session, query_instance, name: Some(write_stream.name), - } + }) } fn streaming_query_cmd() -> spark::StreamingQueryCommand { @@ -565,7 +575,9 @@ impl StreamingQuery { } fn to_json_object(val: Vec) -> Result { - let val = &val.first().unwrap(); + let val = val.first().ok_or(SparkError::AnalysisException( + "empty progress response".to_string(), + ))?; Ok(serde_json::from_str::(val)?) } @@ -636,7 +648,9 @@ impl StreamingQueryManager { for stream in active_result.active_queries { let query = StreamingQuery { spark_session: self.spark_session.clone(), - query_instance: stream.id.clone().unwrap(), + query_instance: stream.id.clone().ok_or(SparkError::AnalysisException( + "streaming query instance id is empty".to_string(), + ))?, name: stream.name, };