Skip to content

Commit 550e42a

Browse files
committed
update code format
1 parent aecd0bf commit 550e42a

File tree

5 files changed

+38
-42
lines changed

5 files changed

+38
-42
lines changed

src/runtime/input/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@
1111
// limitations under the License.
1212

1313
mod input_protocol;
14-
mod interface;
1514
mod input_provider;
1615
mod input_runner;
16+
mod interface;
1717
pub mod protocol;
1818

19-
pub use interface::{Input, InputState};
2019
pub use input_provider::InputProvider;
2120
pub use input_runner::InputRunner;
21+
pub use interface::{Input, InputState};

src/runtime/output/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13-
mod output_protocol;
1413
mod interface;
14+
mod output_protocol;
1515
mod output_provider;
1616
mod output_runner;
1717
mod protocol;
1818

19+
pub use interface::Output;
1920
#[allow(unused_imports)]
2021
pub use output_protocol::OutputProtocol;
21-
pub use interface::Output;
2222
pub use output_provider::OutputProvider;
2323
#[allow(unused_imports)]
2424
pub use output_runner::OutputRunner;

src/runtime/output/output_runner.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,11 @@ impl<P: OutputProtocol> Output for OutputRunner<P> {
306306
let output_id = self.output_id;
307307

308308
let thread_handle = thread::Builder::new()
309-
.name(format!("output-runner-{}-{}", protocol.name(), self.output_id))
309+
.name(format!(
310+
"output-runner-{}-{}",
311+
protocol.name(),
312+
self.output_id
313+
))
310314
.spawn(move || {
311315
Self::worker_loop(protocol, d_r, c_r, c_s, state, mail_box, output_id);
312316
})

src/runtime/processor/wasm/wasm_host.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -410,12 +410,9 @@ impl HostIterator for HostState {
410410
impl functionstream::core::collector::Host for HostState {
411411
fn emit(&mut self, target_id: u32, data: Vec<u8>) {
412412
let output_count = self.outputs.len();
413-
let out = self
414-
.outputs
415-
.get_mut(target_id as usize)
416-
.unwrap_or_else(|| {
417-
panic!("Invalid target_id: {target_id}, available outputs: {output_count}");
418-
});
413+
let out = self.outputs.get_mut(target_id as usize).unwrap_or_else(|| {
414+
panic!("Invalid target_id: {target_id}, available outputs: {output_count}");
415+
});
419416

420417
let buffer_or_event =
421418
BufferOrEvent::new_buffer(data, Some(format!("target_{}", target_id)), false, false);
@@ -427,12 +424,9 @@ impl functionstream::core::collector::Host for HostState {
427424

428425
fn emit_watermark(&mut self, target_id: u32, ts: u64) {
429426
let output_count = self.outputs.len();
430-
let out = self
431-
.outputs
432-
.get_mut(target_id as usize)
433-
.unwrap_or_else(|| {
434-
panic!("Invalid target_id: {target_id}, available outputs: {output_count}");
435-
});
427+
let out = self.outputs.get_mut(target_id as usize).unwrap_or_else(|| {
428+
panic!("Invalid target_id: {target_id}, available outputs: {output_count}");
429+
});
436430

437431
let mut watermark_data = Vec::with_capacity(12);
438432
watermark_data.extend_from_slice(&target_id.to_le_bytes());

src/runtime/processor/wasm/wasm_processor.rs

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -464,31 +464,26 @@ impl WasmProcessor for WasmProcessorImpl {
464464
.first()
465465
.map(|(_, b)| b.as_slice())
466466
.unwrap_or(&[]);
467-
create_wasm_host(
468-
first_bytes,
469-
outputs,
470-
init_context,
471-
task_name,
472-
create_time,
473-
)
474-
.map_err(|e| -> Box<dyn Error + Send> {
475-
let error_msg = format!("Failed to create WasmHost: {}", e);
476-
log::error!("{}", error_msg);
477-
let mut full_error = error_msg.clone();
478-
let mut source = e.source();
479-
let mut depth = 0;
480-
while let Some(err) = source {
481-
depth += 1;
482-
full_error.push_str(&format!("\n Caused by ({}): {}", depth, err));
483-
source = err.source();
484-
if depth > 10 {
485-
full_error.push_str("\n ... (error chain too long, truncated)");
486-
break;
467+
create_wasm_host(first_bytes, outputs, init_context, task_name, create_time).map_err(
468+
|e| -> Box<dyn Error + Send> {
469+
let error_msg = format!("Failed to create WasmHost: {}", e);
470+
log::error!("{}", error_msg);
471+
let mut full_error = error_msg.clone();
472+
let mut source = e.source();
473+
let mut depth = 0;
474+
while let Some(err) = source {
475+
depth += 1;
476+
full_error.push_str(&format!("\n Caused by ({}): {}", depth, err));
477+
source = err.source();
478+
if depth > 10 {
479+
full_error.push_str("\n ... (error chain too long, truncated)");
480+
break;
481+
}
487482
}
488-
}
489-
log::error!("Full error chain:\n{}", full_error);
490-
Box::new(WasmProcessorError::InitError(full_error))
491-
})?
483+
log::error!("Full error chain:\n{}", full_error);
484+
Box::new(WasmProcessorError::InitError(full_error))
485+
},
486+
)?
492487
};
493488

494489
*self.processor.borrow_mut() = Some(processor);
@@ -616,7 +611,10 @@ impl WasmProcessor for WasmProcessorImpl {
616611
Ok(())
617612
}
618613

619-
fn finish_checkpoint_outputs(&mut self, checkpoint_id: u64) -> Result<(), Box<dyn Error + Send>> {
614+
fn finish_checkpoint_outputs(
615+
&mut self,
616+
checkpoint_id: u64,
617+
) -> Result<(), Box<dyn Error + Send>> {
620618
let mut store_ref = self.store.borrow_mut();
621619
let store = store_ref.as_mut().ok_or_else(|| -> Box<dyn Error + Send> {
622620
Box::new(WasmProcessorError::InitError(

0 commit comments

Comments
 (0)