From d8b5a11807426b530ef1cf5de4fc4c95b0163b28 Mon Sep 17 00:00:00 2001 From: zhangcaixian <2369173394@qq.com> Date: Wed, 27 May 2026 11:12:13 +0800 Subject: [PATCH 1/5] feat: add rank0 async HF logs --- xtuner/v1/engine/train_engine.py | 11 ++++++----- xtuner/v1/model/base.py | 14 ++++++++++++++ xtuner/v1/model/compose/base.py | 14 ++++++++++++++ 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/xtuner/v1/engine/train_engine.py b/xtuner/v1/engine/train_engine.py index 184ee35db..9032332b8 100644 --- a/xtuner/v1/engine/train_engine.py +++ b/xtuner/v1/engine/train_engine.py @@ -318,11 +318,12 @@ def async_save_hf( save_dtype: torch.dtype = torch.bfloat16, cleanup_hf_dirs: Sequence[str | Path] = (), ) -> AsyncHFSaveHandle: - return self.model.async_save_hf( - hf_dir=hf_dir, - save_dtype=save_dtype, - cleanup_hf_dirs=cleanup_hf_dirs, - ) + with profile_time_and_memory(f"[Async saving HF to {hf_dir} launch cost]"): + return self.model.async_save_hf( + hf_dir=hf_dir, + save_dtype=save_dtype, + cleanup_hf_dirs=cleanup_hf_dirs, + ) def wait_async_hf(self, handle: AsyncHFSaveHandle | None = None) -> Path | None: return self.model.wait_async_hf(handle) diff --git a/xtuner/v1/model/base.py b/xtuner/v1/model/base.py index cad387a99..aa1c260e8 100644 --- a/xtuner/v1/model/base.py +++ b/xtuner/v1/model/base.py @@ -762,6 +762,14 @@ def _run_async_hf_writer( cleanup_done_path: Path, rank: int, ) -> None: + writer_start = time.time() + log_hf_dir = ( + tmp_hf_dir.with_name(tmp_hf_dir.name[: -len(".incomplete")]) + if tmp_hf_dir.name.endswith(".incomplete") + else tmp_hf_dir + ) + if rank == 0: + logger.info(f"[Async saving HF to {log_hf_dir} writer] started") try: set_async_save_process_qos() self._cleanup_async_hf_dirs_before_write( @@ -775,7 +783,13 @@ def _run_async_hf_writer( weight_map=weight_map, status_path=status_path, ) + if rank == 0: + elapsed = time.time() - writer_start + logger.info(f"[Async saving HF to {log_hf_dir} writer] finished in {elapsed:.2f}s") except Exception as exc: + if rank == 0: + elapsed = time.time() - writer_start + logger.error(f"[Async saving HF to {log_hf_dir} writer] failed after {elapsed:.2f}s: {exc}") status = {"rank": rank, "ok": False, "error": str(exc), "weight_map": {}} with status_path.open("w") as f: f.write(json.dumps(status, indent=2)) diff --git a/xtuner/v1/model/compose/base.py b/xtuner/v1/model/compose/base.py index 68c9957ba..08de1b900 100644 --- a/xtuner/v1/model/compose/base.py +++ b/xtuner/v1/model/compose/base.py @@ -258,6 +258,14 @@ def _run_async_hf_compose_writer( cleanup_done_path: Path, rank: int, ) -> None: + writer_start = time.time() + log_hf_dir = ( + tmp_hf_dir.with_name(tmp_hf_dir.name[: -len(".incomplete")]) + if tmp_hf_dir.name.endswith(".incomplete") + else tmp_hf_dir + ) + if rank == 0: + logger.info(f"[Async saving HF to {log_hf_dir} writer] started") try: set_async_save_process_qos() self._cleanup_async_hf_dirs_before_write( @@ -270,7 +278,13 @@ def _run_async_hf_compose_writer( status = {"rank": rank, "ok": True, "error": "", "weight_map": merged_weight_map} with status_path.open("w") as f: f.write(json.dumps(status, indent=2)) + if rank == 0: + elapsed = time.time() - writer_start + logger.info(f"[Async saving HF to {log_hf_dir} writer] finished in {elapsed:.2f}s") except Exception as exc: + if rank == 0: + elapsed = time.time() - writer_start + logger.error(f"[Async saving HF to {log_hf_dir} writer] failed after {elapsed:.2f}s: {exc}") status = {"rank": rank, "ok": False, "error": str(exc), "weight_map": {}} with status_path.open("w") as f: f.write(json.dumps(status, indent=2)) From 93b5ffe1bb73253584224a3527b970c659796bec Mon Sep 17 00:00:00 2001 From: zhangcaixian <2369173394@qq.com> Date: Wed, 27 May 2026 11:19:08 +0800 Subject: [PATCH 2/5] fix: import time for async HF compose logs --- xtuner/v1/model/compose/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/xtuner/v1/model/compose/base.py b/xtuner/v1/model/compose/base.py index 08de1b900..b2341eb02 100644 --- a/xtuner/v1/model/compose/base.py +++ b/xtuner/v1/model/compose/base.py @@ -1,5 +1,6 @@ import json import multiprocessing as py_mp +import time from pathlib import Path from shutil import rmtree from typing import Callable, Self, Sequence From 98ad5637d343f5daa21734e29b17b8313bfaec50 Mon Sep 17 00:00:00 2001 From: zhangcaixian <2369173394@qq.com> Date: Wed, 27 May 2026 11:31:28 +0800 Subject: [PATCH 3/5] refactor: simplify async HF writer log path --- xtuner/v1/model/base.py | 11 +++-------- xtuner/v1/model/compose/base.py | 11 +++-------- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/xtuner/v1/model/base.py b/xtuner/v1/model/base.py index aa1c260e8..a73a8f360 100644 --- a/xtuner/v1/model/base.py +++ b/xtuner/v1/model/base.py @@ -763,13 +763,8 @@ def _run_async_hf_writer( rank: int, ) -> None: writer_start = time.time() - log_hf_dir = ( - tmp_hf_dir.with_name(tmp_hf_dir.name[: -len(".incomplete")]) - if tmp_hf_dir.name.endswith(".incomplete") - else tmp_hf_dir - ) if rank == 0: - logger.info(f"[Async saving HF to {log_hf_dir} writer] started") + logger.info(f"[Async saving HF to {tmp_hf_dir} writer] started") try: set_async_save_process_qos() self._cleanup_async_hf_dirs_before_write( @@ -785,11 +780,11 @@ def _run_async_hf_writer( ) if rank == 0: elapsed = time.time() - writer_start - logger.info(f"[Async saving HF to {log_hf_dir} writer] finished in {elapsed:.2f}s") + logger.info(f"[Async saving HF to {tmp_hf_dir} writer] finished in {elapsed:.2f}s") except Exception as exc: if rank == 0: elapsed = time.time() - writer_start - logger.error(f"[Async saving HF to {log_hf_dir} writer] failed after {elapsed:.2f}s: {exc}") + logger.error(f"[Async saving HF to {tmp_hf_dir} writer] failed after {elapsed:.2f}s: {exc}") status = {"rank": rank, "ok": False, "error": str(exc), "weight_map": {}} with status_path.open("w") as f: f.write(json.dumps(status, indent=2)) diff --git a/xtuner/v1/model/compose/base.py b/xtuner/v1/model/compose/base.py index b2341eb02..fb99e150f 100644 --- a/xtuner/v1/model/compose/base.py +++ b/xtuner/v1/model/compose/base.py @@ -260,13 +260,8 @@ def _run_async_hf_compose_writer( rank: int, ) -> None: writer_start = time.time() - log_hf_dir = ( - tmp_hf_dir.with_name(tmp_hf_dir.name[: -len(".incomplete")]) - if tmp_hf_dir.name.endswith(".incomplete") - else tmp_hf_dir - ) if rank == 0: - logger.info(f"[Async saving HF to {log_hf_dir} writer] started") + logger.info(f"[Async saving HF to {tmp_hf_dir} writer] started") try: set_async_save_process_qos() self._cleanup_async_hf_dirs_before_write( @@ -281,11 +276,11 @@ def _run_async_hf_compose_writer( f.write(json.dumps(status, indent=2)) if rank == 0: elapsed = time.time() - writer_start - logger.info(f"[Async saving HF to {log_hf_dir} writer] finished in {elapsed:.2f}s") + logger.info(f"[Async saving HF to {tmp_hf_dir} writer] finished in {elapsed:.2f}s") except Exception as exc: if rank == 0: elapsed = time.time() - writer_start - logger.error(f"[Async saving HF to {log_hf_dir} writer] failed after {elapsed:.2f}s: {exc}") + logger.error(f"[Async saving HF to {tmp_hf_dir} writer] failed after {elapsed:.2f}s: {exc}") status = {"rank": rank, "ok": False, "error": str(exc), "weight_map": {}} with status_path.open("w") as f: f.write(json.dumps(status, indent=2)) From 182d039045715897575f0bb8686bb9c96c616df4 Mon Sep 17 00:00:00 2001 From: zhangcaixian <2369173394@qq.com> Date: Wed, 27 May 2026 11:37:12 +0800 Subject: [PATCH 4/5] refactor: rely on log timestamps for async HF writer --- xtuner/v1/model/base.py | 7 ++----- xtuner/v1/model/compose/base.py | 8 ++------ 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/xtuner/v1/model/base.py b/xtuner/v1/model/base.py index a73a8f360..ead53a6d2 100644 --- a/xtuner/v1/model/base.py +++ b/xtuner/v1/model/base.py @@ -762,7 +762,6 @@ def _run_async_hf_writer( cleanup_done_path: Path, rank: int, ) -> None: - writer_start = time.time() if rank == 0: logger.info(f"[Async saving HF to {tmp_hf_dir} writer] started") try: @@ -779,12 +778,10 @@ def _run_async_hf_writer( status_path=status_path, ) if rank == 0: - elapsed = time.time() - writer_start - logger.info(f"[Async saving HF to {tmp_hf_dir} writer] finished in {elapsed:.2f}s") + logger.info(f"[Async saving HF to {tmp_hf_dir} writer] finished") except Exception as exc: if rank == 0: - elapsed = time.time() - writer_start - logger.error(f"[Async saving HF to {tmp_hf_dir} writer] failed after {elapsed:.2f}s: {exc}") + logger.error(f"[Async saving HF to {tmp_hf_dir} writer] failed: {exc}") status = {"rank": rank, "ok": False, "error": str(exc), "weight_map": {}} with status_path.open("w") as f: f.write(json.dumps(status, indent=2)) diff --git a/xtuner/v1/model/compose/base.py b/xtuner/v1/model/compose/base.py index fb99e150f..38fa63d34 100644 --- a/xtuner/v1/model/compose/base.py +++ b/xtuner/v1/model/compose/base.py @@ -1,6 +1,5 @@ import json import multiprocessing as py_mp -import time from pathlib import Path from shutil import rmtree from typing import Callable, Self, Sequence @@ -259,7 +258,6 @@ def _run_async_hf_compose_writer( cleanup_done_path: Path, rank: int, ) -> None: - writer_start = time.time() if rank == 0: logger.info(f"[Async saving HF to {tmp_hf_dir} writer] started") try: @@ -275,12 +273,10 @@ def _run_async_hf_compose_writer( with status_path.open("w") as f: f.write(json.dumps(status, indent=2)) if rank == 0: - elapsed = time.time() - writer_start - logger.info(f"[Async saving HF to {tmp_hf_dir} writer] finished in {elapsed:.2f}s") + logger.info(f"[Async saving HF to {tmp_hf_dir} writer] finished") except Exception as exc: if rank == 0: - elapsed = time.time() - writer_start - logger.error(f"[Async saving HF to {tmp_hf_dir} writer] failed after {elapsed:.2f}s: {exc}") + logger.error(f"[Async saving HF to {tmp_hf_dir} writer] failed: {exc}") status = {"rank": rank, "ok": False, "error": str(exc), "weight_map": {}} with status_path.open("w") as f: f.write(json.dumps(status, indent=2)) From 8bae23bff1f1a927d7ee582f07501caaf46b9fb3 Mon Sep 17 00:00:00 2001 From: zhangcaixian <2369173394@qq.com> Date: Wed, 27 May 2026 15:51:12 +0800 Subject: [PATCH 5/5] refactor: use log_rank0 for async HF writer logs --- xtuner/v1/model/base.py | 9 +++------ xtuner/v1/model/compose/base.py | 9 +++------ 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/xtuner/v1/model/base.py b/xtuner/v1/model/base.py index ead53a6d2..336ebfebf 100644 --- a/xtuner/v1/model/base.py +++ b/xtuner/v1/model/base.py @@ -762,8 +762,7 @@ def _run_async_hf_writer( cleanup_done_path: Path, rank: int, ) -> None: - if rank == 0: - logger.info(f"[Async saving HF to {tmp_hf_dir} writer] started") + log_rank0.info(f"[Async saving HF to {tmp_hf_dir} writer] started") try: set_async_save_process_qos() self._cleanup_async_hf_dirs_before_write( @@ -777,11 +776,9 @@ def _run_async_hf_writer( weight_map=weight_map, status_path=status_path, ) - if rank == 0: - logger.info(f"[Async saving HF to {tmp_hf_dir} writer] finished") + log_rank0.info(f"[Async saving HF to {tmp_hf_dir} writer] finished") except Exception as exc: - if rank == 0: - logger.error(f"[Async saving HF to {tmp_hf_dir} writer] failed: {exc}") + log_rank0.error(f"[Async saving HF to {tmp_hf_dir} writer] failed: {exc}") status = {"rank": rank, "ok": False, "error": str(exc), "weight_map": {}} with status_path.open("w") as f: f.write(json.dumps(status, indent=2)) diff --git a/xtuner/v1/model/compose/base.py b/xtuner/v1/model/compose/base.py index 38fa63d34..b970aaad5 100644 --- a/xtuner/v1/model/compose/base.py +++ b/xtuner/v1/model/compose/base.py @@ -258,8 +258,7 @@ def _run_async_hf_compose_writer( cleanup_done_path: Path, rank: int, ) -> None: - if rank == 0: - logger.info(f"[Async saving HF to {tmp_hf_dir} writer] started") + log_rank0.info(f"[Async saving HF to {tmp_hf_dir} writer] started") try: set_async_save_process_qos() self._cleanup_async_hf_dirs_before_write( @@ -272,11 +271,9 @@ def _run_async_hf_compose_writer( status = {"rank": rank, "ok": True, "error": "", "weight_map": merged_weight_map} with status_path.open("w") as f: f.write(json.dumps(status, indent=2)) - if rank == 0: - logger.info(f"[Async saving HF to {tmp_hf_dir} writer] finished") + log_rank0.info(f"[Async saving HF to {tmp_hf_dir} writer] finished") except Exception as exc: - if rank == 0: - logger.error(f"[Async saving HF to {tmp_hf_dir} writer] failed: {exc}") + log_rank0.error(f"[Async saving HF to {tmp_hf_dir} writer] failed: {exc}") status = {"rank": rank, "ok": False, "error": str(exc), "weight_map": {}} with status_path.open("w") as f: f.write(json.dumps(status, indent=2))