From c4ed32d035b6bc2a3525d2b6d7284256685275c4 Mon Sep 17 00:00:00 2001 From: galois777 Date: Sun, 24 May 2026 20:27:57 -0700 Subject: [PATCH] Add headless processing for face-swap pipeline and Modal cloud integration - Implemented headless driver in `headless.py` to facilitate face-swapping without a GUI, allowing for GPU compute on Modal cloud. - Created `modal_app.py` for running the face-swap pipeline on Modal, including functions for model preparation, face swapping, and face detection. - Added example settings files: `modal_settings.example.json` for single source face swaps and `modal_settings.multiface.example.json` for multi-face mappings. --- .DS_Store | Bin 0 -> 10244 bytes .gitignore | 12 +- app/headless.py | 447 ++++++++++++++++++++++++++ app/processors/models_data.py | 3 +- modal_app.py | 389 ++++++++++++++++++++++ modal_settings.example.json | 28 ++ modal_settings.multiface.example.json | 23 ++ 7 files changed, 899 insertions(+), 3 deletions(-) create mode 100644 .DS_Store create mode 100644 app/headless.py create mode 100644 modal_app.py create mode 100644 modal_settings.example.json create mode 100644 modal_settings.multiface.example.json diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..23b5b5648bd4299d2c00e75b2d1b2ddf480f0bf0 GIT binary patch literal 10244 zcmeHNU2GIp6uxI#=p8!RX`z5Dr30(dKpWZu@)vO1T~uhHz_#qR1-G+1109*2S$Af) zV6EW+u7! z+aZY}F0N)k-D+X|P99Nh*%7`yZTyl2;+?{~GGWa(X@K#4Z z#WN=m;WDTL1OfzRBEatMv&aFWk_^$O-QQ29bEe_A?d|74sH&bdyN1@#TDm_zYLCTT z)h#&PYJQmY`fb}xm)mN3uc42n1eVqVOW|y)@>Io&GCHWe1M8n zOv&@d$2&S(m5$DdR%LwM>h-P4nzidDCPdm8UbSgY>X4bW?4$e$;C}|lEfq(4V)}VZ zrx@k=h^komJm<^j5$DuO(ygI^y@}-DP$DJE1xMebo2qLR^_c4FPD-wG@POMgwzVs- z7x7?;jBOUO7LL?sR5MdRW4+5XNHu3=k)DY=0;_2pR4*{kOrlu1`V z>@o*7`!xIHkYnfFsJBAWz(twWkb1b^cM!$5>lQXGyK`mx+KpSH_xAKn%JmI%rABE% zqG&iq+SCWe49y)KRGo}&X@+%h2;sF{HJ`z&XXU=PvjE0O#TD`s(?a0EP}<*1WRB!w7?2j3G1N? zx*-aEkc2@PhJB#I18@kkV1WYJP$9!tMD2ehj-w8_y9hHkKikq zgm2&+T!5e97q|$Qg&Lt=Xb|QKi-jhkSx|)4!Wv<-uv16~BZAKNE|jBd*guOe1PdEe zz7`l=ezs5e_!`lr+_!CePw$THH$!xNTgT#QUgP`)3zxDW+pwkk3L80j%Y2<02OjmO z1&l|1j}4Wy#i(1{v_w({FgEnVmC;&`V!4LJDxGB3%fgZ(&%)^8-Q_LVosE&hyRB`C zA`2L>yxY;KNOBd1qv-kG)rMX0I~zAil3a~}$;a+qf?eWwqA{F?g&F<#2!D=zPkto7 zlHbW?s0A4s;dW?))vy8S-2*#d7wm>TNb&uUf)S7ACX(ES92|jhB>E#r^`r14JOxj~ zGw=eu2rqebKMt?M8}KH)3n$K;34W`f_09@fGrCWqbupf| zOxrq$S~gNc9Ac9K2J!#rC$i#9=E06Qes^p{h2BvH)Xa)!b2m}ZO2m}ZO2wWcmRoq=+`Twog{`>#+EfRPK2m}b+&TZ;HHz{00B62b%x@ literal 0 HcmV?d00001 diff --git a/.gitignore b/.gitignore index d20fca6a..4e9ae92c 100644 --- a/.gitignore +++ b/.gitignore @@ -17,7 +17,7 @@ merged_embeddings*.txt .vs *.sln *.pyproj -*.json +# *.json .vscode/ tensorrt-engines/ source_videos/ @@ -34,4 +34,12 @@ dependencies/TensorRT/ *.mp4 *.jpg *.exe -.thumbnails \ No newline at end of file +.thumbnails + +# disable temporary files +/faces +/sources +group.png + +# config json +myface.json \ No newline at end of file diff --git a/app/headless.py b/app/headless.py new file mode 100644 index 00000000..e84ea8f0 --- /dev/null +++ b/app/headless.py @@ -0,0 +1,447 @@ +"""Headless driver for VisoMaster's face-swap pipeline. + +The desktop app drives the GPU compute (``FrameWorker.process_frame`` -> +``ModelsProcessor`` -> ONNX models) entirely off Qt widget state held on the +``MainWindow``. This module provides a lightweight stand-in for that window so +the *exact same* compute path can run without a display — e.g. on a Modal GPU. + +Nothing here re-implements the swap logic: it reconstructs the GUI state the +pipeline reads (``control`` dict, per-face ``parameters``, ``target_faces`` with +embeddings) and then calls the real ``FrameWorker.process_frame``. +""" + +import gc +import queue +import subprocess +import types +from pathlib import Path + +import cv2 +import numpy as np +import torch + +from app.helpers.miscellaneous import ParametersDict +from app.processors.models_processor import ModelsProcessor +from app.processors.workers.frame_worker import FrameWorker +from app.ui.widgets.common_layout_data import COMMON_LAYOUT_DATA +from app.ui.widgets.swapper_layout_data import SWAPPER_LAYOUT_DATA +from app.ui.widgets.settings_layout_data import SETTINGS_LAYOUT_DATA +from app.ui.widgets.face_editor_layout_data import FACE_EDITOR_LAYOUT_DATA + +# The four ArcFace models double as both the recognition (matching) keyspace and +# the swapper-input embedding keyspace, so computing all of them covers any swapper. +RECOGNITION_OPTIONS = list( + SETTINGS_LAYOUT_DATA["Face Recognition"]["RecognitionModelSelection"]["options"] +) + + +# --------------------------------------------------------------------------- # +# Default control/parameter construction (mirrors layout_actions widget setup) # +# --------------------------------------------------------------------------- # +def _typed_default(widget_name: str, widget_data: dict): + """Coerce a layout-data ``default`` to the type the running app stores. + + Mirrors ``layout_actions.add_widgets_to_tab_layout``: toggles -> bool, + selections -> str, decimal sliders -> float, plain sliders -> int. + """ + default = widget_data["default"] + if callable(default): + default = default() + if "Toggle" in widget_name: + return bool(default) + if "Selection" in widget_name: + return default + if "DecimalSlider" in widget_name: + return float(default) + if "Slider" in widget_name: # checked after DecimalSlider on purpose + return int(default) + return default # Text and anything else + + +def _build_defaults(layout: dict) -> dict: + out = {} + for _group, widgets in layout.items(): + for widget_name, widget_data in widgets.items(): + out[widget_name] = _typed_default(widget_name, widget_data) + return out + + +class _NoopSignal: + def emit(self, *args, **kwargs): + pass + + +class _Button: + """Stand-in for a checkable Qt button.""" + + def __init__(self, checked: bool): + self._checked = checked + + def isChecked(self) -> bool: + return self._checked + + +class HeadlessTargetFace: + """Minimal stand-in for ``TargetFaceCardButton`` used during matching/swap.""" + + def __init__(self, face_id: str, embedding_store: dict, assigned_input_embedding: dict): + self.face_id = face_id + self.embedding_store = embedding_store + self.assigned_input_embedding = assigned_input_embedding + + def get_embedding(self, embedding_swap_model: str) -> np.ndarray: + return self.embedding_store.get(embedding_swap_model, np.array([])) + + +class HeadlessMainWindow: + """Provides exactly the attributes the compute path reads off ``MainWindow``.""" + + def __init__(self, device: str = "cuda", provider: str = "CUDA"): + # Signals the pipeline emits for the (now absent) loading dialog. + self.model_loading_signal = _NoopSignal() + self.model_loaded_signal = _NoopSignal() + + # Layout-derived defaults, exactly as the GUI would initialise them. + self.default_parameters = {} + for layout in (COMMON_LAYOUT_DATA, SWAPPER_LAYOUT_DATA, FACE_EDITOR_LAYOUT_DATA): + self.default_parameters.update(_build_defaults(layout)) + self.control = _build_defaults(SETTINGS_LAYOUT_DATA) + self.control["OutputMediaFolder"] = "" + + self.parameters = {} # face_id -> ParametersDict + self.current_widget_parameters = ParametersDict( + dict(self.default_parameters), self.default_parameters + ) + self.selected_target_face_id = False + self.target_faces = {} + self.dfm_models_data = {} + self.markers = {} + + # Button states: swap on, everything else off. + self.swapfacesButton = _Button(True) + self.editFacesButton = _Button(False) + self.faceCompareCheckBox = _Button(False) + self.faceMaskCheckBox = _Button(False) + + # FrameWorker only stores this reference; the swap path never touches it. + self.video_processor = types.SimpleNamespace(file_type="video", processing=False) + + self.models_processor = ModelsProcessor(self, device=device) + # Force a plain CUDA/ONNX provider (no TensorRT engine building for the + # batch path); also sets device and moves the LivePortrait mask to GPU. + self.models_processor.switch_providers_priority(provider) + + +# --------------------------------------------------------------------------- # +# Embeddings # +# --------------------------------------------------------------------------- # +def compute_embedding_store(models_processor, bgr_image: np.ndarray, control: dict, + max_num: int = 1) -> dict: + """Detect the most prominent face in a BGR image and return its embeddings + across every recognition model (the same store the GUI builds per face).""" + rgb = np.ascontiguousarray(bgr_image[..., ::-1]) + img = torch.from_numpy(rgb.astype("uint8")).to(models_processor.device).permute(2, 0, 1) + + _bboxes, kpss_5, _kpss = models_processor.run_detect( + img, + control["DetectorModelSelection"], + max_num=max_num, + score=control["DetectorScoreSlider"] / 100.0, + input_size=(512, 512), + use_landmark_detection=control["LandmarkDetectToggle"], + landmark_detect_mode=control["LandmarkDetectModelSelection"], + landmark_score=control["LandmarkDetectScoreSlider"] / 100.0, + from_points=control["DetectFromPointsToggle"], + ) + if len(kpss_5) == 0: + raise ValueError("No face detected in the provided face image.") + + kps_5 = kpss_5[0] + store = {} + for model in RECOGNITION_OPTIONS: + embedding, _crop = models_processor.run_recognize_direct( + img, kps_5, control["SimilarityTypeSelection"], model + ) + store[model] = embedding + return store + + +def merge_embedding_stores(stores: list, method: str = "Mean") -> dict: + """Average several per-face embedding stores (matches TargetFaceCardButton + .calculate_assigned_input_embedding).""" + if not stores: + return {} + reducer = np.median if method == "Median" else np.mean + models = set().union(*[s.keys() for s in stores]) + return { + model: reducer([s[model] for s in stores if model in s], axis=0) + for model in models + } + + +# --------------------------------------------------------------------------- # +# Settings -> control / parameters # +# --------------------------------------------------------------------------- # +_CONTROL_SHORTCUTS = { + "detector_model": "DetectorModelSelection", + "detector_score": "DetectorScoreSlider", + "recognition_model": "RecognitionModelSelection", + "max_faces": "MaxFacesToDetectSlider", + "landmark_detect": "LandmarkDetectToggle", +} + + +def apply_control_settings(mw: HeadlessMainWindow, settings: dict): + settings = settings or {} + for key, control_key in _CONTROL_SHORTCUTS.items(): + if key in settings: + mw.control[control_key] = settings[key] + for key, value in (settings.get("control") or {}).items(): + mw.control[key] = value + + +def make_face_parameters(mw: HeadlessMainWindow, settings: dict, + similarity_threshold, extra_overrides: dict = None) -> ParametersDict: + """Build a per-face ParametersDict from defaults + global shortcuts + global + ``parameters`` + this face's ``extra_overrides``. Missing keys fall back to + defaults. Per-face overrides win over global; the threshold arg always wins.""" + settings = settings or {} + overrides = dict(settings.get("parameters") or {}) + + if "swap_model" in settings: + overrides["SwapModelSelection"] = settings["swap_model"] + + restorer = settings.get("face_restorer") + if restorer: + overrides["FaceRestorerEnableToggle"] = bool(restorer.get("enabled", True)) + if "type" in restorer: + overrides["FaceRestorerTypeSelection"] = restorer["type"] + if "det_type" in restorer: + overrides["FaceRestorerDetTypeSelection"] = restorer["det_type"] + if "blend" in restorer: + overrides["FaceRestorerBlendSlider"] = restorer["blend"] + if "fidelity" in restorer: + overrides["FaceFidelityWeightDecimalSlider"] = restorer["fidelity"] + + if extra_overrides: + overrides.update(extra_overrides) + overrides["SimilarityThresholdSlider"] = similarity_threshold + + return ParametersDict(overrides, mw.default_parameters) + + +def _register_target_face(mw: HeadlessMainWindow, face_id: str, target_img_bgr, + source_imgs_bgr: list, threshold, settings: dict, + extra_overrides: dict = None): + """Build one target face: source identity to swap in (averaged over the + given source images) matched against a reference (or itself, for swap-all).""" + source_stores = [ + compute_embedding_store(mw.models_processor, img, mw.control) + for img in source_imgs_bgr + ] + assigned = merge_embedding_stores( + source_stores, mw.control.get("EmbMergeMethodSelection", "Mean") + ) + if not assigned: + raise ValueError("Could not build a source embedding from the given face image(s).") + + if target_img_bgr is not None: + target_store = compute_embedding_store(mw.models_processor, target_img_bgr, mw.control) + else: + # Swap-all: a non-degenerate target embedding (reuse source) + threshold 0 + # makes every detected face match. + target_store = assigned + + mw.target_faces[face_id] = HeadlessTargetFace(face_id, target_store, assigned) + mw.parameters[face_id] = make_face_parameters(mw, settings, threshold, extra_overrides) + + +def setup(mw: HeadlessMainWindow, entries: list, settings: dict): + """Configure one or more face swaps. This is the headless equivalent of the + GUI's target-face cards + assigned source faces. + + ``entries`` is a list of dicts: + - ``source_imgs``: list[BGR ndarray] — identity to swap *in* (averaged). + - ``target_img``: BGR ndarray | None — reference of who to *replace*. + May be None only for a single swap-all entry. + - ``parameters``: dict (optional) — per-face parameter overrides. + - ``swap_model``: str (optional) — per-face swapper model. + - ``threshold``: int (optional) — per-face match threshold. + + Each detected face in a frame is matched (by ArcFace embedding) against every + entry's reference; the first whose similarity >= its threshold is swapped to + that entry's source identity. Faces matching no entry are left untouched. + """ + apply_control_settings(mw, settings) + if not entries: + raise ValueError("No swap entries provided.") + + global_threshold = settings.get("similarity_threshold", 60) + swap_all = len(entries) == 1 and entries[0].get("target_img") is None + + for i, entry in enumerate(entries): + target_img = entry.get("target_img") + if target_img is None and not swap_all: + raise ValueError( + "Multi-face mode requires a target_face reference for every swap entry." + ) + extra = dict(entry.get("parameters") or {}) + if entry.get("swap_model"): + extra["SwapModelSelection"] = entry["swap_model"] + threshold = 0 if target_img is None else entry.get("threshold", global_threshold) + _register_target_face(mw, str(i), target_img, entry["source_imgs"], + threshold, settings, extra) + + +def detect_face_crops(mw: HeadlessMainWindow, bgr_image: np.ndarray, + pad: float = 0.4, dedup_threshold=None) -> list: + """Detect faces in a BGR image and return padded crops in reading order. + + Used by the ``find-faces`` helper so the distinct people in a group shot can + be saved as numbered references and paired with source identities. Returns a + list of dicts: ``{"crop": BGR ndarray, "bbox": (x1, y1, x2, y2)}``. + """ + rgb = np.ascontiguousarray(bgr_image[..., ::-1]) + img = torch.from_numpy(rgb.astype("uint8")).to(mw.models_processor.device).permute(2, 0, 1) + control = mw.control + + bboxes, kpss_5, _kpss = mw.models_processor.run_detect( + img, + control["DetectorModelSelection"], + max_num=control["MaxFacesToDetectSlider"], + score=control["DetectorScoreSlider"] / 100.0, + input_size=(512, 512), + use_landmark_detection=control["LandmarkDetectToggle"], + landmark_detect_mode=control["LandmarkDetectModelSelection"], + landmark_score=control["LandmarkDetectScoreSlider"] / 100.0, + from_points=control["DetectFromPointsToggle"], + ) + + height, width = bgr_image.shape[:2] + seen_embeddings = [] + faces = [] + for i in range(len(kpss_5)): + if dedup_threshold is not None: + emb, _ = mw.models_processor.run_recognize_direct( + img, kpss_5[i], control["SimilarityTypeSelection"], + control["RecognitionModelSelection"], + ) + if any(mw.models_processor.findCosineDistance(emb, e) >= dedup_threshold + for e in seen_embeddings): + continue + seen_embeddings.append(emb) + + x1, y1, x2, y2 = (float(v) for v in bboxes[i][:4]) + px, py = int((x2 - x1) * pad), int((y2 - y1) * pad) + cx1, cy1 = max(0, int(x1 - px)), max(0, int(y1 - py)) + cx2, cy2 = min(width, int(x2 + px)), min(height, int(y2 + py)) + crop = np.ascontiguousarray(bgr_image[cy1:cy2, cx1:cx2]) + faces.append({"crop": crop, "bbox": (cx1, cy1, cx2, cy2)}) + + # Reading order: group into rough rows, then left-to-right. + faces.sort(key=lambda f: (f["bbox"][1] // 50, f["bbox"][0])) + return faces + + +# --------------------------------------------------------------------------- # +# Frame processing # +# --------------------------------------------------------------------------- # +_DUMMY_QUEUE = queue.Queue() + + +def process_rgb_frame(mw: HeadlessMainWindow, rgb_frame: np.ndarray, frame_number: int) -> np.ndarray: + """Run the real per-frame swap pipeline on one RGB frame; returns BGR.""" + worker = FrameWorker( + np.ascontiguousarray(rgb_frame.astype("uint8")), + mw, frame_number, _DUMMY_QUEUE, is_single_frame=True, + ) + worker.parameters = mw.parameters + worker.is_view_face_compare = False + worker.is_view_face_mask = False + return np.ascontiguousarray(worker.process_frame()) + + +def _ffmpeg_writer(width: int, height: int, fps: float, out_path: str) -> subprocess.Popen: + args = [ + "ffmpeg", "-hide_banner", "-loglevel", "error", "-y", + "-f", "rawvideo", "-pix_fmt", "bgr24", + "-s", f"{width}x{height}", "-r", str(fps), "-i", "pipe:", + "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2,format=yuvj420p", + "-c:v", "libx264", "-crf", "18", + out_path, + ] + return subprocess.Popen(args, stdin=subprocess.PIPE) + + +def _mux_audio(video_only_path: str, source_media_path: str, out_path: str): + """Copy the source's audio track onto the swapped video (audio optional).""" + args = [ + "ffmpeg", "-hide_banner", "-loglevel", "error", "-y", + "-i", video_only_path, + "-i", source_media_path, + "-c", "copy", + "-map", "0:v:0", "-map", "1:a:0?", + "-shortest", + out_path, + ] + subprocess.run(args, check=False) + + +def swap_image(mw: HeadlessMainWindow, in_path: str, out_path: str): + bgr = cv2.imread(in_path, cv2.IMREAD_COLOR) + if bgr is None: + raise ValueError(f"Could not read image: {in_path}") + out_bgr = process_rgb_frame(mw, bgr[..., ::-1], 0) + cv2.imwrite(out_path, out_bgr) + + +def swap_video(mw: HeadlessMainWindow, in_path: str, out_path: str, + progress_every: int = 30): + """Swap every frame of a video and mux the original audio back in.""" + cap = cv2.VideoCapture(in_path) + if not cap.isOpened(): + raise ValueError(f"Could not open video: {in_path}") + fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 + total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) + + temp_path = str(Path(out_path).with_suffix(".video_only.mp4")) + writer = None + out_w = out_h = None + frame_idx = 0 + + try: + while True: + ret, frame_bgr = cap.read() + if not ret: + break + out_bgr = process_rgb_frame(mw, frame_bgr[..., ::-1], frame_idx) + + if writer is None: + out_h, out_w = out_bgr.shape[:2] + writer = _ffmpeg_writer(out_w, out_h, fps, temp_path) + elif out_bgr.shape[0] != out_h or out_bgr.shape[1] != out_w: + # Keep a constant raw-video size (e.g. if a restorer/enhancer + # changes dimensions on some frames). + out_bgr = np.ascontiguousarray(cv2.resize(out_bgr, (out_w, out_h))) + + writer.stdin.write(out_bgr.tobytes()) + frame_idx += 1 + + if frame_idx % progress_every == 0: + print(f"Processed {frame_idx}/{total or '?'} frames", flush=True) + torch.cuda.empty_cache() + finally: + cap.release() + if writer is not None: + writer.stdin.close() + writer.wait() + gc.collect() + torch.cuda.empty_cache() + + if writer is None: + raise ValueError("No frames were read from the input video.") + + _mux_audio(temp_path, in_path, out_path) + Path(temp_path).unlink(missing_ok=True) + print(f"Done: {frame_idx} frames -> {out_path}", flush=True) + return out_path diff --git a/app/processors/models_data.py b/app/processors/models_data.py index 72a76961..f549e9ba 100644 --- a/app/processors/models_data.py +++ b/app/processors/models_data.py @@ -1,4 +1,5 @@ -models_dir = './model_assets' +import os +models_dir = os.environ.get("VISOMASTER_MODELS_DIR", "./model_assets") assets_repo = "https://github.com/visomaster/visomaster-assets/releases/download" try: diff --git a/modal_app.py b/modal_app.py new file mode 100644 index 00000000..b3217bbe --- /dev/null +++ b/modal_app.py @@ -0,0 +1,389 @@ +"""Run VisoMaster's face-swap pipeline on a Modal cloud GPU (headless batch). + +This deliberately keeps all heavy imports (torch, onnxruntime, app.*) *inside* +the remote functions so the file can be driven from a machine without a GPU or +the CUDA stack installed (e.g. macOS). + +Usage +----- +1. One-time: download the ~63 model files into a persistent Volume: + + modal run modal_app.py::download + +2. Swap a video (replace every detected face with the source identity): + + modal run modal_app.py --target path/to/video.mp4 \ + --source path/to/source_face.jpg \ + --output out.mp4 + + Only replace a specific person (give a reference image of who to replace): + + modal run modal_app.py --target video.mp4 --source new_face.jpg \ + --target-face person_to_replace.jpg --settings settings.json --output out.mp4 + +3. Many faces -> many people. First extract the distinct faces as references: + + modal run modal_app.py::find_faces --target group.jpg # -> faces/face_00.png ... + + Then map each one to a source identity in a settings 'swaps' list and run: + + modal run modal_app.py --target group.jpg \ + --settings modal_settings.multiface.example.json --output out.jpg + +See modal_settings.example.json (single source) and +modal_settings.multiface.example.json (per-person mapping) for the schema. +""" + +import json +from pathlib import Path + +import modal + +REPO_DIR = Path(__file__).parent +MODELS_DIR = "/models" + +# GPU type. "L40S" (48GB) is a safe, fast default. Cheaper: "A10" (24GB) is +# plenty for Inswapper128 + a restorer. Faster/bigger: "A100", "H100". +GPU = "L40S" + +# Anything heavier than this likely needs the output streamed via the Volume +# instead of returned inline; fine for typical clips. +FUNCTION_TIMEOUT = 3600 # seconds + +image = ( + modal.Image.from_registry( + "nvidia/cuda:12.4.1-cudnn-runtime-ubuntu22.04", add_python="3.10" + ) + # Pip layers first so a later system-deps tweak doesn't re-download torch. + # Torch built against CUDA 12.4. + .pip_install( + "torch==2.4.1", + "torchvision==0.19.1", + index_url="https://download.pytorch.org/whl/cu124", + ) + # Everything else from PyPI. TensorRT is intentionally omitted — the batch + # path runs on the CUDA ONNX Runtime provider, and the code already guards + # `import tensorrt` with try/except. + .pip_install( + "numpy==1.26.4", + "opencv-python-headless==4.10.0.84", + "scikit-image==0.21.0", + "pillow==9.5.0", + "onnx==1.16.1", + "protobuf==4.23.2", + "psutil==6.0.0", + "onnxruntime-gpu==1.20.0", + "packaging==24.1", + "PySide6==6.8.2.1", + "kornia", + "tqdm", + "ftfy", + "regex", + "numexpr", + "onnxsim", + "requests", + "pyqt-toast-notification==1.3.2", + "qdarkstyle", + "pyqtdarktheme", + ) + # ffmpeg + the system libraries PySide6's Qt6 needs (QtCore/QtGui/QtWidgets + # are imported by the compute chain even though we run headless/offscreen). + # binutils provides `strip` for the ABI-tag fix below. + .apt_install( + "ffmpeg", + "binutils", + "libgl1", "libegl1", "libopengl0", "libglib2.0-0", + "libdbus-1-3", "libxkbcommon0", "libxkbcommon-x11-0", + "libfontconfig1", "libfreetype6", + "libx11-6", "libxext6", "libxrender1", "libsm6", "libice6", + "libxcb1", "libxcb-cursor0", "libxcb-render0", "libxcb-render-util0", + "libxcb-shape0", "libxcb-xfixes0", "libxcb-randr0", "libxcb-icccm4", + "libxcb-image0", "libxcb-keysyms1", "libxcb-shm0", "libxcb-sync1", + "libxcb-xinerama0", "libxcb-util1", "libxcb-xkb1", + ) + # Modal runs under gVisor, which reports an old kernel version. PySide6 6.8's + # Qt libs declare a higher minimum kernel in their .note.ABI-tag, so glibc's + # loader rejects them ("libQt6Core.so.6: cannot open shared object file"). + # Stripping that note removes the kernel-version gate; the libs run fine. + .run_commands( + "find / -path '*/PySide6/*' -name '*.so*' -type f " + "-exec strip --remove-section=.note.ABI-tag {} + 2>/dev/null; " + "find / -path '*/shiboken6/*' -name '*.so*' -type f " + "-exec strip --remove-section=.note.ABI-tag {} + 2>/dev/null; true" + ) + .env( + { + "VISOMASTER_MODELS_DIR": MODELS_DIR, + # No display in the container; keeps incidental Qt calls headless. + "QT_QPA_PLATFORM": "offscreen", + } + ) + .add_local_dir( + str(REPO_DIR), + remote_path="/root/VisoMaster", + ignore=[ + ".git", + "**/__pycache__", + ".github", + "tensorrt-engines", + "output", + "*.mp4", + "*.avi", + "*.mkv", + "*.mov", + ], + ) +) + +app = modal.App("visomaster-swap", image=image) +models_volume = modal.Volume.from_name("visomaster-models", create_if_missing=True) + +REMOTE_REPO = "/root/VisoMaster" + + +def _bootstrap(): + """Make the in-container repo importable and model-dir aware.""" + import os + import sys + + os.environ.setdefault("VISOMASTER_MODELS_DIR", MODELS_DIR) + os.chdir(REMOTE_REPO) + if REMOTE_REPO not in sys.path: + sys.path.insert(0, REMOTE_REPO) + + +@app.function(volumes={MODELS_DIR: models_volume}, timeout=FUNCTION_TIMEOUT) +def prepare_models(): + """Populate the Volume: copy the small committed assets, then download the + model files from the VisoMaster assets release. Safe to re-run (skips + files already present with a valid hash).""" + import os + import shutil + + _bootstrap() + + # Committed assets (plugins, meanshape/lip pkls) baked into the image. + committed = Path(REMOTE_REPO) / "model_assets" + if committed.exists(): + shutil.copytree(committed, MODELS_DIR, dirs_exist_ok=True) + + # Import only after the model dir env var is set (paths resolve from it). + from app.helpers.downloader import download_file + from app.processors.models_data import models_list + + os.makedirs(MODELS_DIR, exist_ok=True) + failures = [] + for md in models_list: + os.makedirs(os.path.dirname(md["local_path"]), exist_ok=True) + ok = download_file(md["model_name"], md["local_path"], md["hash"], md.get("url")) + if not ok: + failures.append(md["model_name"]) + + models_volume.commit() + print(f"\nModel preparation complete. {len(models_list) - len(failures)}/{len(models_list)} ready.") + if failures: + print("Failed downloads:", ", ".join(failures)) + + +def _decode_image(buf: bytes): + import cv2 + import numpy as np + + img = cv2.imdecode(np.frombuffer(buf, np.uint8), cv2.IMREAD_COLOR) + if img is None: + raise ValueError("Could not decode a provided image.") + return img + + +@app.function(gpu=GPU, volumes={MODELS_DIR: models_volume}, timeout=FUNCTION_TIMEOUT) +def swap(target_media: bytes, target_filename: str, swaps: list, settings: dict) -> bytes: + """Swap faces in an image or video and return the encoded result bytes. + + ``swaps`` is a list of mappings; each has ``source_faces`` (list of image + bytes), an optional ``target_face`` (reference image bytes; None => swap all + faces), and optional per-face ``parameters``/``swap_model``/``threshold``. + """ + import tempfile + + _bootstrap() + from app import headless + from app.helpers.miscellaneous import is_image_file + + entries = [] + for s in swaps: + ref = s.get("target_face") + entry = { + "target_img": _decode_image(ref) if ref else None, + "source_imgs": [_decode_image(b) for b in s["source_faces"]], + "parameters": s.get("parameters") or {}, + } + if s.get("swap_model"): + entry["swap_model"] = s["swap_model"] + if s.get("threshold") is not None: + entry["threshold"] = s["threshold"] + entries.append(entry) + + mw = headless.HeadlessMainWindow(device="cuda") + headless.setup(mw, entries, settings) + + with tempfile.TemporaryDirectory() as tmp: + suffix = Path(target_filename).suffix or ".mp4" + in_path = str(Path(tmp) / f"input{suffix}") + Path(in_path).write_bytes(target_media) + + if is_image_file(target_filename): + out_path = str(Path(tmp) / "output.png") + headless.swap_image(mw, in_path, out_path) + else: + out_path = str(Path(tmp) / "output.mp4") + headless.swap_video(mw, in_path, out_path) + + return Path(out_path).read_bytes() + + +@app.function(gpu=GPU, volumes={MODELS_DIR: models_volume}, timeout=FUNCTION_TIMEOUT) +def detect_faces(target_media: bytes, target_filename: str, frame: int, settings: dict) -> list: + """Detect distinct faces in an image (or one video frame) and return a list + of PNG-encoded crops in reading order — references for the ``swaps`` mapping.""" + import tempfile + + import cv2 + + _bootstrap() + from app import headless + from app.helpers.miscellaneous import is_image_file + + if is_image_file(target_filename): + img_bgr = _decode_image(target_media) + else: + with tempfile.TemporaryDirectory() as tmp: + vid = str(Path(tmp) / ("v" + (Path(target_filename).suffix or ".mp4"))) + Path(vid).write_bytes(target_media) + cap = cv2.VideoCapture(vid) + if frame: + cap.set(cv2.CAP_PROP_POS_FRAMES, frame) + ok, img_bgr = cap.read() + cap.release() + if not ok: + raise ValueError(f"Could not read frame {frame} from the video.") + + mw = headless.HeadlessMainWindow(device="cuda") + headless.apply_control_settings(mw, settings or {}) + + crops = [] + for face in headless.detect_face_crops(mw, img_bgr): + ok, buf = cv2.imencode(".png", face["crop"]) + if ok: + crops.append(bytes(buf)) + return crops + + +@app.local_entrypoint() +def download(): + """One-time model download into the persistent Volume.""" + prepare_models.remote() + + +_IMAGE_SUFFIXES = (".jpg", ".jpeg", ".png", ".webp", ".bmp") + + +@app.local_entrypoint() +def find_faces(target: str, out_dir: str = "faces", frame: int = 0, settings: str = ""): + """Extract each distinct face from the target into numbered crops you can + pair with source identities in a settings 'swaps' list. + + modal run modal_app.py::find_faces --target group.jpg + """ + target_path = Path(target) + if not target_path.is_file(): + raise SystemExit(f"Target media not found: {target}") + settings_dict = json.loads(Path(settings).read_text()) if settings else {} + + print(f"Detecting faces in {target_path.name} on Modal [{GPU}]...") + crops = detect_faces.remote( + target_media=target_path.read_bytes(), + target_filename=target_path.name, + frame=frame, + settings=settings_dict, + ) + out = Path(out_dir) + out.mkdir(parents=True, exist_ok=True) + for i, png in enumerate(crops): + (out / f"face_{i:02d}.png").write_bytes(png) + print(f"Saved {len(crops)} face(s) to {out}/ (face_00.png ...).") + print("Pair each with a source in your settings JSON 'swaps' list, then run the swap.") + + +def _resolve(path: str, base: Path) -> Path: + p = Path(path) + return p if p.is_absolute() else base / p + + +@app.local_entrypoint() +def main(target: str, source: str = "", source_dir: str = "", + target_face: str = "", settings: str = "", output: str = ""): + """Submit a swap job. Runs locally; only file IO happens on this machine. + + Multi-face: put a 'swaps' list in --settings (reference image per person -> + source identity). Single/all: use --source (+ optional --target-face). + """ + target_path = Path(target) + if not target_path.is_file(): + raise SystemExit(f"Target media not found: {target}") + settings_dict = json.loads(Path(settings).read_text()) if settings else {} + + swaps_payload = [] + swaps_cfg = settings_dict.get("swaps") + if swaps_cfg: + base = Path(settings).resolve().parent + for entry in swaps_cfg: + srcs = entry.get("source_face") + srcs = [srcs] if isinstance(srcs, str) else list(srcs or []) + if not srcs: + raise SystemExit("Each 'swaps' entry needs a 'source_face'.") + ref = entry.get("target_face") + swaps_payload.append({ + "target_face": _resolve(ref, base).read_bytes() if ref else None, + "source_faces": [_resolve(s, base).read_bytes() for s in srcs], + "parameters": entry.get("parameters") or {}, + "swap_model": entry.get("swap_model"), + "threshold": entry.get("threshold"), + }) + print(f"Multi-face mode: {len(swaps_payload)} mapping(s).") + else: + source_paths = [] + if source: + source_paths.append(Path(source)) + if source_dir: + source_paths += sorted( + p for p in Path(source_dir).iterdir() + if p.suffix.lower() in _IMAGE_SUFFIXES + ) + if not source_paths: + raise SystemExit( + "Provide --source/--source-dir, or a 'swaps' list in --settings." + ) + swaps_payload.append({ + "target_face": Path(target_face).read_bytes() if target_face else None, + "source_faces": [p.read_bytes() for p in source_paths], + "parameters": {}, + "swap_model": None, + "threshold": None, + }) + print(f"Single-source mode: {len(source_paths)} source face(s), " + f"{'one person' if target_face else 'all faces'}.") + + is_image = target_path.suffix.lower() in _IMAGE_SUFFIXES + out_path = Path(output) if output else target_path.with_name( + target_path.stem + "_swapped" + (".png" if is_image else ".mp4") + ) + + print(f"Uploading {target_path.name} to Modal [{GPU}]...") + result = swap.remote( + target_media=target_path.read_bytes(), + target_filename=target_path.name, + swaps=swaps_payload, + settings=settings_dict, + ) + out_path.write_bytes(result) + print(f"Saved swapped output to {out_path}") diff --git a/modal_settings.example.json b/modal_settings.example.json new file mode 100644 index 00000000..ba9c767e --- /dev/null +++ b/modal_settings.example.json @@ -0,0 +1,28 @@ +{ + "swap_model": "Inswapper128", + "detector_model": "RetinaFace", + "detector_score": 50, + "max_faces": 20, + "similarity_threshold": 60, + + "face_restorer": { + "enabled": true, + "type": "GFPGAN-v1.4", + "blend": 100, + "fidelity": 0.5 + }, + + "control": { + "RecognitionModelSelection": "Inswapper128ArcFace", + "EmbMergeMethodSelection": "Mean" + }, + + "parameters": { + "SwapperResSelection": "128", + "BorderTopSlider": 10, + "BorderBottomSlider": 10, + "BorderLeftSlider": 10, + "BorderRightSlider": 10, + "OverallMaskBlendAmountSlider": 5 + } +} diff --git a/modal_settings.multiface.example.json b/modal_settings.multiface.example.json new file mode 100644 index 00000000..870dfe1b --- /dev/null +++ b/modal_settings.multiface.example.json @@ -0,0 +1,23 @@ +{ + "swap_model": "Inswapper128", + "detector_model": "RetinaFace", + "detector_score": 50, + "max_faces": 50, + "similarity_threshold": 60, + + "face_restorer": { "enabled": true, "type": "GFPGAN-v1.4", "blend": 100 }, + + "swaps": [ + { + "target_face": "faces/face_00.png", + "source_face": "sources/alice_new.jpg" + }, + { + "target_face": "faces/face_03.png", + "source_face": ["sources/bob1.jpg", "sources/bob2.jpg"], + "swap_model": "SimSwap512", + "threshold": 55, + "parameters": { "FaceRestorerEnableToggle": true } + } + ] +}