Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions openless-all/app/src-tauri/src/asr/local/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ impl LocalAsrCache {
slot.take();
}
}
log::info!("[local-asr cache] loading {model_id} from {}", model_dir.display());
log::info!(
"[local-asr cache] loading {model_id} from {}",
model_dir.display()
);
let engine = Arc::new(QwenAsrEngine::load(model_dir)?);
let mut slot = self.inner.lock();
*slot = Some(CachedEngine {
Expand Down Expand Up @@ -124,7 +127,10 @@ impl LocalAsrCache {
{
let taken = self.inner.lock().take();
if let Some(cached) = taken {
log::info!("[local-asr cache] release engine {} on demand", cached.model_id);
log::info!(
"[local-asr cache] release engine {} on demand",
cached.model_id
);
drop(cached);
pressure_relief_macos();
}
Expand Down
87 changes: 70 additions & 17 deletions openless-all/app/src-tauri/src/asr/local/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,20 @@ fn keep_file(path: &str) -> bool {
return false;
}
let lower = path.to_ascii_lowercase();
if lower.ends_with(".md") || lower.ends_with(".png") || lower.ends_with(".jpg")
|| lower.ends_with(".jpeg") || lower.ends_with(".gif") || lower.ends_with(".svg")
if lower.ends_with(".md")
|| lower.ends_with(".png")
|| lower.ends_with(".jpg")
|| lower.ends_with(".jpeg")
|| lower.ends_with(".gif")
|| lower.ends_with(".svg")
{
return false;
}
let ext = lower.rsplit('.').next().unwrap_or("");
matches!(ext, "json" | "safetensors" | "txt" | "bin" | "model" | "tiktoken")
matches!(
ext,
"json" | "safetensors" | "txt" | "bin" | "model" | "tiktoken"
)
}

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -205,7 +212,10 @@ impl DownloadManager {
flag.store(true, Ordering::SeqCst);
log::info!("[local-asr] cancel requested for {}", model_id.as_str());
} else {
log::info!("[local-asr] cancel requested for {} but no active download", model_id.as_str());
log::info!(
"[local-asr] cancel requested for {} but no active download",
model_id.as_str()
);
}
}

Expand Down Expand Up @@ -289,9 +299,8 @@ async fn run_download(
}
}

let in_flight_bytes: Arc<Vec<AtomicU64>> = Arc::new(
info.files.iter().map(|_| AtomicU64::new(0)).collect()
);
let in_flight_bytes: Arc<Vec<AtomicU64>> =
Arc::new(info.files.iter().map(|_| AtomicU64::new(0)).collect());
let already_done_bytes: u64 = info
.files
.iter()
Expand Down Expand Up @@ -480,7 +489,13 @@ async fn download_one(
// 远端文件 ≤ 一个 chunk 大小:直接单 chunk,不走 sparse + idx
if total_size <= CHUNK_SIZE {
let result = chunk_with_retry(
client, url, &partial, 0, total_size - 1, &cancel, &on_progress,
client,
url,
&partial,
0,
total_size - 1,
&cancel,
&on_progress,
)
.await;
if cancel.load(Ordering::SeqCst) {
Expand All @@ -499,7 +514,8 @@ async fn download_one(
let done_set = read_idx(&idx_path);

// 3. 预先把 .partial 撑到最终大小(sparse 文件,holes = 零字节)
if !partial.exists() || std::fs::metadata(&partial).map(|m| m.len()).unwrap_or(0) != total_size {
if !partial.exists() || std::fs::metadata(&partial).map(|m| m.len()).unwrap_or(0) != total_size
{
let f = std::fs::OpenOptions::new()
.write(true)
.create(true)
Expand Down Expand Up @@ -630,7 +646,10 @@ fn read_idx(path: &Path) -> HashSet<usize> {

fn append_idx(path: &Path, idx: usize) -> std::io::Result<()> {
use std::io::Write;
let mut f = std::fs::OpenOptions::new().create(true).append(true).open(path)?;
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
writeln!(f, "{idx}")
}

Expand All @@ -657,7 +676,17 @@ async fn chunk_with_retry(
if cancel.load(Ordering::SeqCst) {
anyhow::bail!("cancelled");
}
match try_download_range_append(client, url, partial, range_start, range_end, cancel, on_progress).await {
match try_download_range_append(
client,
url,
partial,
range_start,
range_end,
cancel,
on_progress,
)
.await
{
Ok(()) => return Ok(()),
Err(e) => {
let msg = format!("{e:#}");
Expand All @@ -673,7 +702,8 @@ async fn chunk_with_retry(
}
}
}
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("chunk failed after {PER_CHUNK_ATTEMPTS} attempts")))
Err(last_err
.unwrap_or_else(|| anyhow::anyhow!("chunk failed after {PER_CHUNK_ATTEMPTS} attempts")))
}

async fn try_download_range_append(
Expand All @@ -687,12 +717,19 @@ async fn try_download_range_append(
) -> Result<()> {
let mut req = client.get(url);
req = req.header("Range", format!("bytes={range_start}-{range_end}"));
let resp = req.send().await.with_context(|| format!("HTTP GET {url} failed"))?;
let resp = req
.send()
.await
.with_context(|| format!("HTTP GET {url} failed"))?;
let status = resp.status();
if status.as_u16() != 200 && status.as_u16() != 206 {
anyhow::bail!("HTTP {status} for {url}");
}
let effective_start = if status.as_u16() == 200 { 0 } else { range_start };
let effective_start = if status.as_u16() == 200 {
0
} else {
range_start
};

// 截断 partial 到本次 attempt 的起点,再 seek 写入。
// 老 append 实现的 bug:若上一次 attempt 已写了部分字节后失败,retry 拿到的还是
Expand Down Expand Up @@ -744,7 +781,18 @@ async fn chunk_with_retry_seek(
if cancel.load(Ordering::SeqCst) {
anyhow::bail!("cancelled");
}
match try_download_range_seek(client, url, partial, range_start, range_end, cancel, bytes_in_file, on_progress).await {
match try_download_range_seek(
client,
url,
partial,
range_start,
range_end,
cancel,
bytes_in_file,
on_progress,
)
.await
{
Ok(()) => return Ok(()),
Err(e) => {
let msg = format!("{e:#}");
Expand All @@ -760,7 +808,11 @@ async fn chunk_with_retry_seek(
}
}
}
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("chunk [{range_start}-{range_end}] failed after {PER_CHUNK_ATTEMPTS} attempts")))
Err(last_err.unwrap_or_else(|| {
anyhow::anyhow!(
"chunk [{range_start}-{range_end}] failed after {PER_CHUNK_ATTEMPTS} attempts"
)
}))
}

async fn try_download_range_seek(
Expand Down Expand Up @@ -806,7 +858,8 @@ async fn try_download_range_seek(
}
let bytes = chunk.context("read stream chunk failed")?;
file.write_all(&bytes).await.context("write chunk failed")?;
let new_total = bytes_in_file.fetch_add(bytes.len() as u64, Ordering::Relaxed) + bytes.len() as u64;
let new_total =
bytes_in_file.fetch_add(bytes.len() as u64, Ordering::Relaxed) + bytes.len() as u64;
on_progress(new_total);
}
file.flush().await.ok();
Expand Down
9 changes: 5 additions & 4 deletions openless-all/app/src-tauri/src/asr/local/local_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ impl LocalQwenAsr {
// 走 Tauri 持有的 runtime handle,不依赖调用方上下文(虽然这里目前都在 async 路径上调,
// 但保持一致更稳)。
let engine = Arc::clone(&self.engine);
let text = tauri::async_runtime::spawn_blocking(move || engine.transcribe_stream(&samples_f32))
.await
.context("transcribe spawn_blocking join 失败")?
.context("qwen_transcribe_stream 失败")?;
let text =
tauri::async_runtime::spawn_blocking(move || engine.transcribe_stream(&samples_f32))
.await
.context("transcribe spawn_blocking join 失败")?
.context("qwen_transcribe_stream 失败")?;

// 解绑回调,避免 idle 期 C 端任何后续触发。
self.engine.set_token_handler::<fn(&str)>(None);
Expand Down
4 changes: 2 additions & 2 deletions openless-all/app/src-tauri/src/asr/local/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ mod qwen_engine;
#[cfg(target_os = "macos")]
mod qwen_ffi;

#[cfg(target_os = "macos")]
pub use qwen_engine::QwenAsrEngine;
#[cfg(target_os = "macos")]
pub use local_provider::LocalQwenAsr;
#[cfg(target_os = "macos")]
pub use qwen_engine::QwenAsrEngine;

pub use download::{DownloadManager, Mirror};
pub use models::{ModelId, ModelStatus};
Expand Down
10 changes: 4 additions & 6 deletions openless-all/app/src-tauri/src/asr/local/qwen_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ impl QwenAsrEngine {
/// 批式转写:一次性给完整音频(mono f32 16kHz)。
pub fn transcribe_audio(&self, samples: &[f32]) -> Result<String> {
// SAFETY: samples 在调用期间存活;返回是 C `malloc` 出的字符串。
let raw = unsafe {
qwen_transcribe_audio(self.ctx, samples.as_ptr(), samples.len() as i32)
};
let raw =
unsafe { qwen_transcribe_audio(self.ctx, samples.as_ptr(), samples.len() as i32) };
if raw.is_null() {
anyhow::bail!("qwen_transcribe_audio 返回 NULL");
}
Expand All @@ -95,9 +94,8 @@ impl QwenAsrEngine {
/// 流式转写:内部按 2s chunk 切片,token 通过 `set_token_handler` 注册的
/// 回调实时吐出;返回值是最终完整文本。
pub fn transcribe_stream(&self, samples: &[f32]) -> Result<String> {
let raw = unsafe {
qwen_transcribe_stream(self.ctx, samples.as_ptr(), samples.len() as i32)
};
let raw =
unsafe { qwen_transcribe_stream(self.ctx, samples.as_ptr(), samples.len() as i32) };
if raw.is_null() {
anyhow::bail!("qwen_transcribe_stream 返回 NULL");
}
Expand Down
29 changes: 15 additions & 14 deletions openless-all/app/src-tauri/src/asr/local/test_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,18 @@ pub async fn run_test(model_id: ModelId) -> Result<TestResult> {
// qwen_load 是同步阻塞调用且较慢(数秒);扔到 spawn_blocking 不阻塞 tokio runtime。
let load_start = Instant::now();
let dir_for_blocking = dir.clone();
let engine = tauri::async_runtime::spawn_blocking(move || {
load_engine(&dir_for_blocking)
})
.await
.map_err(|e| anyhow::anyhow!("spawn_blocking join failed: {e:#}"))??;
let engine = tauri::async_runtime::spawn_blocking(move || load_engine(&dir_for_blocking))
.await
.map_err(|e| anyhow::anyhow!("spawn_blocking join failed: {e:#}"))??;
let load_ms = load_start.elapsed().as_millis() as u64;

// transcribe_audio 也是阻塞 + 重活,同样扔到 blocking pool。
let trans_start = Instant::now();
let engine_clone = Arc::clone(&engine);
let text = tauri::async_runtime::spawn_blocking(move || {
engine_clone.transcribe_audio(&samples)
})
.await
.map_err(|e| anyhow::anyhow!("spawn_blocking join failed: {e:#}"))??;
let text =
tauri::async_runtime::spawn_blocking(move || engine_clone.transcribe_audio(&samples))
.await
.map_err(|e| anyhow::anyhow!("spawn_blocking join failed: {e:#}"))??;
let transcribe_ms = trans_start.elapsed().as_millis() as u64;

Ok(TestResult {
Expand Down Expand Up @@ -113,13 +110,17 @@ fn decode_wav_16k_mono(bytes: &[u8]) -> Result<Vec<f32>> {
if body_start + 16 > bytes.len() {
anyhow::bail!("fmt chunk 越界");
}
let format = u16::from_le_bytes(bytes[body_start..body_start + 2].try_into().unwrap());
let format =
u16::from_le_bytes(bytes[body_start..body_start + 2].try_into().unwrap());
if format != 1 {
anyhow::bail!("只支持 PCM(format=1),当前 format={format}");
}
channels = u16::from_le_bytes(bytes[body_start + 2..body_start + 4].try_into().unwrap());
sample_rate = u32::from_le_bytes(bytes[body_start + 4..body_start + 8].try_into().unwrap());
bits_per_sample = u16::from_le_bytes(bytes[body_start + 14..body_start + 16].try_into().unwrap());
channels =
u16::from_le_bytes(bytes[body_start + 2..body_start + 4].try_into().unwrap());
sample_rate =
u32::from_le_bytes(bytes[body_start + 4..body_start + 8].try_into().unwrap());
bits_per_sample =
u16::from_le_bytes(bytes[body_start + 14..body_start + 16].try_into().unwrap());
}
b"data" => {
data_offset = body_start;
Expand Down
Loading
Loading