diff --git a/examples/swegym_slime_grpo/run_pi_interactive_apptainer.sh b/examples/swegym_slime_grpo/run_pi_interactive_apptainer.sh new file mode 100644 index 000000000..7dbf16673 --- /dev/null +++ b/examples/swegym_slime_grpo/run_pi_interactive_apptainer.sh @@ -0,0 +1,1196 @@ +#!/usr/bin/env bash +# Interactive SWE-Gym Slime GRPO launcher for PI + Qwen3.5-4B. +# +# Default mode is a small smoke run: +# bash examples/swegym_slime_grpo/run_pi_interactive_apptainer.sh +# +# Full 293-row training from an interactive allocation: +# SMOKE_NUM_ROWS=0 NUM_EPOCH=1 bash examples/swegym_slime_grpo/run_pi_interactive_apptainer.sh +# +# Dry-run preflight without starting Ray/SGLang/Slime: +# DRY_RUN=1 bash examples/swegym_slime_grpo/run_pi_interactive_apptainer.sh +set -euo pipefail + +SCRIPT_DIR="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" +PROJECT_ROOT="$(cd -- "${SCRIPT_DIR}/../.." && pwd)" +REFERENCE_ROOT="${REFERENCE_ROOT:-/lustre/fs1/portfolios/llmservice/projects/llmservice_fm_vision/users/shaokunz/polar/ProRL-Agent-Server}" +cd "${PROJECT_ROOT}" + +log() { + printf '[%s] %s\n' "$(date +'%F %T')" "$*" +} + +die() { + printf 'ERROR: %s\n' "$*" >&2 + exit 1 +} + +abs_path() { + case "$1" in + /*) printf '%s\n' "$1" ;; + *) printf '%s/%s\n' "${PROJECT_ROOT}" "$1" ;; + esac +} + +first_existing_dir() { + for candidate in "$@"; do + if [ -d "${candidate}" ]; then + printf '%s\n' "${candidate}" + return 0 + fi + done + printf '%s\n' "$1" +} + +first_existing_file() { + for candidate in "$@"; do + if [ -f "${candidate}" ]; then + printf '%s\n' "${candidate}" + return 0 + fi + done + printf '%s\n' "$1" +} + +detect_gpu_count() { + if command -v nvidia-smi >/dev/null 2>&1; then + local count + set +e + count="$(nvidia-smi --query-gpu=index --format=csv,noheader 2>/dev/null | wc -l | tr -d ' ')" + set -e + printf '%s\n' "${count:-0}" + else + printf '0\n' + fi +} + +detect_cpu_count() { + local count + count="$(getconf _NPROCESSORS_ONLN 2>/dev/null || printf '32')" + case "${count}" in + ''|*[!0-9]*) printf '32\n' ;; + *) printf '%s\n' "${count}" ;; + esac +} + +detect_host_ip() { + python3 - <<'PY' +import socket + +try: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.connect(("8.8.8.8", 80)) + print(sock.getsockname()[0]) + sock.close() +except Exception: + try: + print(socket.gethostbyname(socket.gethostname())) + except Exception: + print("127.0.0.1") +PY +} + +checkpoint_marker_exists() { + [ -f "$1/latest_checkpointed_iteration.txt" ] +} + +choose_ref_load() { + local current="${PROJECT_ROOT}/tmp/checkpoints/Qwen3.5-4B_torch_dist" + local reference="${REFERENCE_ROOT}/tmp/checkpoints/Qwen3.5-4B_torch_dist" + if checkpoint_marker_exists "${current}"; then + printf '%s\n' "${current}" + else + printf '%s\n' "${reference}" + fi +} + +choose_agent_cli_dir() { + local current="${PROJECT_ROOT}/tmp/swegym_agent_cli/opt_node" + local reference="${REFERENCE_ROOT}/tmp/swegym_agent_cli/opt_node" + if [ -x "${current}/bin/pi" ] && [ -x "${current}/bin/node" ]; then + printf '%s\n' "${current}" + elif [ -x "${reference}/bin/pi" ] && [ -x "${reference}/bin/node" ]; then + printf '%s\n' "${reference}" + else + printf '%s\n' "${current}" + fi +} + +choose_hf_checkpoint() { + local local_model="/lustre/fs1/portfolios/llmservice/projects/llmservice_fm_vision/users/shaokunz/model/Qwen3.5-4B" + if [ -d "${local_model}" ]; then + printf '%s\n' "${local_model}" + else + printf '%s\n' "Qwen/Qwen3.5-4B" + fi +} + +is_path_like() { + case "$1" in + /*|./*|../*|~*) return 0 ;; + *) return 1 ;; + esac +} + +# ============================================================================= +# User settings +# ============================================================================= +RUN_ID="${RUN_ID:-swegym_pi_qwen35_4b_$(date +%Y%m%d_%H%M%S)}" + +# Model names: +# - MODEL_NAME is the model served by SGLang. +# - PI_MODEL_NAME must be provider/model form for the pi CLI. +MODEL_NAME="${MODEL_NAME:-Qwen/Qwen3.5-4B}" +PI_MODEL_NAME="${PI_MODEL_NAME:-openai/${MODEL_NAME}}" +HF_CHECKPOINT="${HF_CHECKPOINT:-$(choose_hf_checkpoint)}" +REF_LOAD="$(abs_path "${REF_LOAD:-$(choose_ref_load)}")" +SAVE_DIR="$(abs_path "${SAVE_DIR:-${PROJECT_ROOT}/tmp/ckpt/${RUN_ID}}")" +FRESH_START="${FRESH_START:-1}" +RESUME_FROM_SAVE="${RESUME_FROM_SAVE:-0}" +START_ROLLOUT_ID="${START_ROLLOUT_ID:-}" + +# Data. SMOKE_NUM_ROWS=1 makes a quick one-task debug run. Set 0 for all rows. +FULL_PROMPT_DATA="$(abs_path "${FULL_PROMPT_DATA:-${SCRIPT_DIR}/swegym_train_293.jsonl}")" +SMOKE_NUM_ROWS="${SMOKE_NUM_ROWS:-1}" +RUN_DIR="$(abs_path "${RUN_DIR:-${PROJECT_ROOT}/tmp/${RUN_ID}}")" +RUN_LOG_DIR="$(abs_path "${RUN_LOG_DIR:-${RUN_DIR}/logs}")" +ROLLOUT_SAVE_DIR="$(abs_path "${ROLLOUT_SAVE_DIR:-${RUN_DIR}/rollout_results}")" +PROMPT_DATA="$(abs_path "${PROMPT_DATA:-${RUN_DIR}/swegym_train_${SMOKE_NUM_ROWS}.jsonl}")" + +# Training/runtime container and dependency checkouts. +TRAIN_SQSH_DEFAULT="$(first_existing_file \ + "${PROJECT_ROOT}/tmp/polr_swegym_slime_grpo_train.sqsh" \ + "/lustre/fs1/portfolios/llmservice/projects/llmservice_fm_vision/users/shaokunz/docker/polr_swegym_slime_grpo_train_codex_yazi_v2.sqsh")" +TRAIN_SQSH="$(abs_path "${TRAIN_SQSH:-${POLR_TRAIN_SQSH:-${TRAIN_SQSH_DEFAULT}}}")" +USE_TRAIN_SQSH="${USE_TRAIN_SQSH:-1}" +SLIME_DIR="$(abs_path "${SLIME_DIR:-$(first_existing_dir "${PROJECT_ROOT}/slime" "${REFERENCE_ROOT}/slime")}")" +MEGATRON_DIR="$(abs_path "${MEGATRON_DIR:-$(first_existing_dir "${PROJECT_ROOT}/Megatron-LM" "${REFERENCE_ROOT}/Megatron-LM")}")" + +# SWE-Gym task SIFs and shared Node/agent CLI assets. +SHARED_SIF_DIR="${SHARED_SIF_DIR:-/lustre/fs1/portfolios/nvr/projects/nvr_lpr_agentic/users/haozh/singularity_images_v3}" +APPTAINER_IMAGE_DIR="$(abs_path "${APPTAINER_IMAGE_DIR:-${PROJECT_ROOT}/tmp/swegym_apptainer_images}")" +AGENT_CLI_DIR="$(abs_path "${AGENT_CLI_DIR:-$(choose_agent_cli_dir)}")" +PREPARE_AGENT_CLI="${PREPARE_AGENT_CLI:-0}" +PREPARE_MISSING_SIFS="${PREPARE_MISSING_SIFS:-0}" + +# GPU split. Current example uses 2 actor GPUs and the rest for rollout. +DETECTED_GPUS="$(detect_gpu_count)" +TOTAL_GPUS="${TOTAL_GPUS:-${DETECTED_GPUS:-4}}" +[ "${TOTAL_GPUS}" -gt 0 ] || TOTAL_GPUS=4 +DETECTED_CPUS="$(detect_cpu_count)" +RAY_NUM_CPUS="${RAY_NUM_CPUS:-${DETECTED_CPUS}}" +if [ "${RAY_NUM_CPUS}" -gt 32 ]; then + RAY_NUM_CPUS=32 +fi +ACTOR_NUM_NODES="${ACTOR_NUM_NODES:-1}" +TRAIN_NUM_GPUS="${TRAIN_NUM_GPUS:-2}" +ACTOR_NUM_GPUS_PER_NODE="${ACTOR_NUM_GPUS_PER_NODE:-}" +ROLLOUT_NUM_GPUS="${ROLLOUT_NUM_GPUS:-$((TOTAL_GPUS - TRAIN_NUM_GPUS))}" +ROLLOUT_NUM_GPUS_PER_ENGINE="${ROLLOUT_NUM_GPUS_PER_ENGINE:-1}" +TENSOR_MODEL_PARALLEL_SIZE="${TENSOR_MODEL_PARALLEL_SIZE:-2}" + +# Smoke defaults are intentionally small; full-run defaults mirror current run.sh. +if [ "${SMOKE_NUM_ROWS}" -gt 0 ]; then + ROLLOUT_BATCH_SIZE="${ROLLOUT_BATCH_SIZE:-1}" + N_SAMPLES_PER_PROMPT="${N_SAMPLES_PER_PROMPT:-1}" + NUM_ROLLOUT="${NUM_ROLLOUT:-1}" + SAVE_INTERVAL="${SAVE_INTERVAL:-1}" + POLAR_TASK_TIMEOUT_SECONDS="${POLAR_TASK_TIMEOUT_SECONDS:-1200}" + POLAR_REQUEST_TIMEOUT="${POLAR_REQUEST_TIMEOUT:-1200}" +else + ROLLOUT_BATCH_SIZE="${ROLLOUT_BATCH_SIZE:-4}" + N_SAMPLES_PER_PROMPT="${N_SAMPLES_PER_PROMPT:-16}" + NUM_ROLLOUT="${NUM_ROLLOUT:-}" + SAVE_INTERVAL="${SAVE_INTERVAL:-5}" + POLAR_TASK_TIMEOUT_SECONDS="${POLAR_TASK_TIMEOUT_SECONDS:-1200}" + POLAR_REQUEST_TIMEOUT="${POLAR_REQUEST_TIMEOUT:-1200}" +fi +NUM_EPOCH="${NUM_EPOCH:-1}" +NUM_STEPS_PER_ROLLOUT="${NUM_STEPS_PER_ROLLOUT:-1}" +DISTRIBUTED_TIMEOUT_MINUTES="${DISTRIBUTED_TIMEOUT_MINUTES:-180}" +MAX_TOKENS_PER_GPU="${MAX_TOKENS_PER_GPU:-60000}" +ROLLOUT_MAX_RESPONSE_LEN="${ROLLOUT_MAX_RESPONSE_LEN:-16000}" +ROLLOUT_MAX_PROMPT_LEN="${ROLLOUT_MAX_PROMPT_LEN:-32000}" +SGLANG_CONTEXT_LENGTH="${SGLANG_CONTEXT_LENGTH:-50000}" +SGLANG_MEM_FRACTION_STATIC="${SGLANG_MEM_FRACTION_STATIC:-0.8}" +SGLANG_LOG_LEVEL="${SGLANG_LOG_LEVEL:-warning}" +TRAIN_LR="${TRAIN_LR:-1e-6}" +CLIP_GRAD="${CLIP_GRAD:-1.0}" +KL_LOSS_COEF="${KL_LOSS_COEF:-0.001}" +EPS_CLIP="${EPS_CLIP:-0.2}" +EPS_CLIP_HIGH="${EPS_CLIP_HIGH:-0.28}" + +# Polar/agent settings. PI_API_TYPE defaults to pi-ai's OpenAI chat-completions provider. +POLAR_BUILDER_STRATEGY="${POLAR_BUILDER_STRATEGY:-prefix_merging}" +POLAR_MIN_COMPLETE_ACCEPT_FRACTION="${POLAR_MIN_COMPLETE_ACCEPT_FRACTION:-0.6}" +if [ "${SMOKE_NUM_ROWS}" -gt 0 ]; then + POLAR_MAX_ASYNC_LEVEL="${POLAR_MAX_ASYNC_LEVEL:-1}" +else + POLAR_MAX_ASYNC_LEVEL="${POLAR_MAX_ASYNC_LEVEL:-2}" +fi +# Optional per-command Apptainer sandbox memory cap. Empty disables the cap. +POLAR_RUNTIME_MEMORY_MB="${POLAR_RUNTIME_MEMORY_MB:-}" +PI_API_TYPE="${PI_API_TYPE:-openai-completions}" +PI_MAX_TOKENS="${PI_MAX_TOKENS:-512}" +PI_CONTEXT_WINDOW="${PI_CONTEXT_WINDOW:-32000}" +if [ "${PI_CONTEXT_WINDOW}" -le 0 ]; then + fail "PI_CONTEXT_WINDOW must be positive; got ${PI_CONTEXT_WINDOW}" +fi +PI_THINKING="${PI_THINKING:-}" +PI_FAIL_ON_CONTEXT_LIMIT="${PI_FAIL_ON_CONTEXT_LIMIT:-1}" +if [ "${PI_FAIL_ON_CONTEXT_LIMIT}" = "1" ]; then + PI_COMPACTION_ENABLED="${PI_COMPACTION_ENABLED:-false}" + PI_RETRY_ENABLED="${PI_RETRY_ENABLED:-false}" +else + PI_COMPACTION_ENABLED="${PI_COMPACTION_ENABLED:-true}" + PI_RETRY_ENABLED="${PI_RETRY_ENABLED:-true}" +fi +PI_RETRY_MAX_RETRIES="${PI_RETRY_MAX_RETRIES:-0}" +PI_PROVIDER_MAX_RETRIES="${PI_PROVIDER_MAX_RETRIES:-0}" + +# Local ports. Override these for parallel runs on one node. +ROLLOUT_PORT="${ROLLOUT_PORT:-18080}" +GATEWAY_PORT="${GATEWAY_PORT:-18100}" +SGLANG_ROUTER_PORT="${SGLANG_ROUTER_PORT:-26000}" +SLIME_SGLANG_BASE_PORT="${SLIME_SGLANG_BASE_PORT:-24000}" +RAY_PORT="${RAY_PORT:-6379}" +RAY_DASHBOARD_PORT="${RAY_DASHBOARD_PORT:-28265}" +RAY_USE_EXISTING_CLUSTER="${RAY_USE_EXISTING_CLUSTER:-0}" +RAY_STOP_ON_EXIT="${RAY_STOP_ON_EXIT:-1}" +USE_RAY_JOB_SUBMIT="${USE_RAY_JOB_SUBMIT:-0}" +RAY_JOB_ADDRESS="${RAY_JOB_ADDRESS:-http://127.0.0.1:${RAY_DASHBOARD_PORT}}" +RAY_TMPDIR="$(abs_path "${RAY_TMPDIR:-/tmp/polar-ray-${USER:-user}-$$}")" +SGLANG_ROUTER_HOST="${SGLANG_ROUTER_HOST:-$(detect_host_ip)}" +SGLANG_ROUTER_BASE_URL="${SGLANG_ROUTER_BASE_URL:-http://${SGLANG_ROUTER_HOST}:${SGLANG_ROUTER_PORT}}" + +# Apptainer caches. Keep these short because Ray uses Unix sockets under tmp. +POLAR_APPTAINER_BIN="${POLAR_APPTAINER_BIN:-/usr/bin/apptainer}" +POLAR_APPTAINER_DIRECT_EXEC="${POLAR_APPTAINER_DIRECT_EXEC:-1}" +POLAR_JOB_CACHE_ROOT="${POLAR_JOB_CACHE_ROOT:-/tmp/polar-swegym-pi-${USER:-user}-${RUN_ID}}" +APPTAINER_CACHEDIR="${APPTAINER_CACHEDIR:-${POLAR_JOB_CACHE_ROOT}/apptainer-cache}" +APPTAINER_TMPDIR="${APPTAINER_TMPDIR:-${POLAR_JOB_CACHE_ROOT}/apptainer-tmp}" +APPTAINER_WORKDIR="${APPTAINER_WORKDIR:-${POLAR_JOB_CACHE_ROOT}/apptainer-work}" +TRITON_CACHE_DIR="${TRITON_CACHE_DIR:-${POLAR_JOB_CACHE_ROOT}/triton-cache}" +TRITON_HOME="${TRITON_HOME:-${POLAR_JOB_CACHE_ROOT}/triton-home}" +TORCHINDUCTOR_CACHE_DIR="${TORCHINDUCTOR_CACHE_DIR:-${POLAR_JOB_CACHE_ROOT}/torchinductor}" +TORCH_EXTENSIONS_DIR="${TORCH_EXTENSIONS_DIR:-${POLAR_JOB_CACHE_ROOT}/torch-extensions}" +XDG_CACHE_HOME="${POLAR_XDG_CACHE_HOME:-${POLAR_JOB_CACHE_ROOT}/xdg-cache}" +XDG_CONFIG_HOME="${POLAR_XDG_CONFIG_HOME:-${POLAR_JOB_CACHE_ROOT}/xdg-config}" +XDG_RUNTIME_DIR="${POLAR_XDG_RUNTIME_DIR:-${POLAR_JOB_CACHE_ROOT}/xdg-runtime}" +CUDA_CACHE_PATH="${CUDA_CACHE_PATH:-${POLAR_JOB_CACHE_ROOT}/cuda-cache}" +NUMBA_CACHE_DIR="${NUMBA_CACHE_DIR:-${POLAR_JOB_CACHE_ROOT}/numba}" +FLASHINFER_WORKSPACE_DIR="${FLASHINFER_WORKSPACE_DIR:-${POLAR_JOB_CACHE_ROOT}/flashinfer-cache}" +HOST_NVIDIA_LIB_DIR="$(abs_path "${HOST_NVIDIA_LIB_DIR:-${PROJECT_ROOT}/tmp/host-nvidia-libs}")" + +# W&B is enabled by default. The key is passed through env only and is not echoed. +DEFAULT_WANDB_API_KEY="" +USE_WANDB="${USE_WANDB:-1}" +WANDB_MODE="${WANDB_MODE:-online}" +WANDB_PROJECT="${WANDB_PROJECT:-polar-swegym-pi-qwen35-4b}" +WANDB_GROUP="${WANDB_GROUP:-swegym-pi-qwen35-4b-full293-t1200}" +WANDB_RUN_ID="${WANDB_RUN_ID:-${RUN_ID}}" +WANDB_RANDOM_SUFFIX="${WANDB_RANDOM_SUFFIX:-0}" +WANDB_TEAM="${WANDB_TEAM:-${WANDB_ENTITY:-}}" +WANDB_HOST="${WANDB_HOST:-}" +WANDB_API_KEY="${WANDB_API_KEY:-${WANDB_KEY:-${DEFAULT_WANDB_API_KEY}}}" +WANDB_DIR="$(abs_path "${WANDB_DIR:-${PROJECT_ROOT}/logs/wandb}")" + +# Preparation and diagnostics. +PATCH_SLIME="${PATCH_SLIME:-1}" +PATCH_SGLANG="${PATCH_SGLANG:-1}" +PATCH_RAY_PY312="${PATCH_RAY_PY312:-1}" +CONVERT_WEIGHTS="${CONVERT_WEIGHTS:-0}" +SLIME_ZERO_NONFINITE_GRADS="${SLIME_ZERO_NONFINITE_GRADS:-1}" +DRY_RUN="${DRY_RUN:-0}" + +resolve_gpu_split() { + [ "${TRAIN_NUM_GPUS}" -gt 0 ] || die "TRAIN_NUM_GPUS must be positive" + [ "${ROLLOUT_NUM_GPUS}" -gt 0 ] || die "ROLLOUT_NUM_GPUS must be positive" + [ "$((TRAIN_NUM_GPUS + ROLLOUT_NUM_GPUS))" -le "${TOTAL_GPUS}" ] || \ + die "TRAIN_NUM_GPUS + ROLLOUT_NUM_GPUS exceeds TOTAL_GPUS: train=${TRAIN_NUM_GPUS}, rollout=${ROLLOUT_NUM_GPUS}, total=${TOTAL_GPUS}" + + if [ -z "${ACTOR_NUM_GPUS_PER_NODE}" ]; then + [ "$((TRAIN_NUM_GPUS % ACTOR_NUM_NODES))" -eq 0 ] || \ + die "TRAIN_NUM_GPUS (${TRAIN_NUM_GPUS}) must be divisible by ACTOR_NUM_NODES (${ACTOR_NUM_NODES})" + ACTOR_NUM_GPUS_PER_NODE="$((TRAIN_NUM_GPUS / ACTOR_NUM_NODES))" + fi + [ "$((TRAIN_NUM_GPUS % TENSOR_MODEL_PARALLEL_SIZE))" -eq 0 ] || \ + die "TRAIN_NUM_GPUS (${TRAIN_NUM_GPUS}) must be divisible by TENSOR_MODEL_PARALLEL_SIZE (${TENSOR_MODEL_PARALLEL_SIZE})" +} + +host_library_path() { + local lib="$1" + ldconfig -p 2>/dev/null | awk -v lib="${lib}" '$1 == lib { print $NF; exit }' +} + +prepare_host_nvidia_libs() { + local lib source target + mkdir -p "${HOST_NVIDIA_LIB_DIR}" + for lib in libcuda.so.1 libnvidia-ml.so.1; do + source="$(host_library_path "${lib}")" + if [ -n "${source}" ] && [ -f "${source}" ]; then + target="${HOST_NVIDIA_LIB_DIR}/${lib}" + if [ ! -f "${target}" ] || ! cmp -s "${source}" "${target}"; then + cp -Lf "${source}" "${target}" + fi + chmod 644 "${target}" || true + fi + done +} + +maybe_reexec_in_training_sqsh() { + if [ "${USE_TRAIN_SQSH}" != "1" ] || [ "${INSIDE_TRAIN_SQSH:-0}" = "1" ]; then + return + fi + [ -f "${TRAIN_SQSH}" ] || die "training sqsh not found: ${TRAIN_SQSH}" + command -v apptainer >/dev/null 2>&1 || die "host apptainer not found" + + mkdir -p "${POLAR_JOB_CACHE_ROOT}/home" "${APPTAINER_CACHEDIR}" "${APPTAINER_TMPDIR}" "${APPTAINER_WORKDIR}" \ + "${TRITON_CACHE_DIR}" "${TRITON_HOME}" "${TORCHINDUCTOR_CACHE_DIR}" "${TORCH_EXTENSIONS_DIR}" \ + "${XDG_CACHE_HOME}" "${XDG_CONFIG_HOME}" "${XDG_RUNTIME_DIR}" "${CUDA_CACHE_PATH}" \ + "${NUMBA_CACHE_DIR}" "${FLASHINFER_WORKSPACE_DIR}" "${HOST_NVIDIA_LIB_DIR}" + chmod 700 "${POLAR_JOB_CACHE_ROOT}/home" "${XDG_RUNTIME_DIR}" || true + prepare_host_nvidia_libs + + local bind_args=(--bind /lustre/fs1:/lustre/fs1) + if [ -d /lustre/fsw ]; then + bind_args+=(--bind /lustre/fsw:/lustre/fsw) + fi + local host_nvidia_ld="" + local host_nvidia_preload="" + if [ -f "${HOST_NVIDIA_LIB_DIR}/libnvidia-ml.so.1" ]; then + bind_args+=(--bind "${HOST_NVIDIA_LIB_DIR}:/host-nvidia-libs:ro") + host_nvidia_ld="/host-nvidia-libs:" + if [ -f "${HOST_NVIDIA_LIB_DIR}/libcuda.so.1" ]; then + host_nvidia_preload="/host-nvidia-libs/libcuda.so.1" + fi + fi + + local container_path="/opt/polr_venv/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" + local container_ld="${host_nvidia_ld}/usr/local/cuda/compat/lib.real:/usr/lib/x86_64-linux-gnu:/usr/local/cuda/compat/lib:/usr/local/cuda/lib64:${LD_LIBRARY_PATH:-}" + export XDG_RUNTIME_DIR + export APPTAINERENV_PATH="${container_path}" + export APPTAINERENV_LD_LIBRARY_PATH="${container_ld}" + export APPTAINERENV_LD_PRELOAD="${host_nvidia_preload}" + export APPTAINERENV_PYTHONNOUSERSITE=1 + export APPTAINERENV_XDG_RUNTIME_DIR="${XDG_RUNTIME_DIR}" + export APPTAINERENV_XDG_CACHE_HOME="${XDG_CACHE_HOME}" + export APPTAINERENV_XDG_CONFIG_HOME="${XDG_CONFIG_HOME}" + + log "Re-entering through training sqsh: ${TRAIN_SQSH}" + exec apptainer exec --nv --writable-tmpfs --cleanenv --no-home --pwd "${PROJECT_ROOT}" \ + "${bind_args[@]}" \ + --env "PATH=${container_path}" \ + --env "LD_LIBRARY_PATH=${container_ld}" \ + --env "LD_PRELOAD=${host_nvidia_preload}" \ + --env "PYTHONNOUSERSITE=1" \ + --env "XDG_RUNTIME_DIR=${XDG_RUNTIME_DIR}" \ + --env "XDG_CACHE_HOME=${XDG_CACHE_HOME}" \ + --env "XDG_CONFIG_HOME=${XDG_CONFIG_HOME}" \ + --env "APPTAINER_CACHEDIR=${APPTAINER_CACHEDIR}" \ + --env "APPTAINER_TMPDIR=${APPTAINER_TMPDIR}" \ + --env "APPTAINER_WORKDIR=${APPTAINER_WORKDIR}" \ + --env "SINGULARITY_CACHEDIR=${APPTAINER_CACHEDIR}" \ + --env "SINGULARITY_TMPDIR=${APPTAINER_TMPDIR}" \ + --env "TRITON_CACHE_DIR=${TRITON_CACHE_DIR}" \ + --env "TRITON_HOME=${TRITON_HOME}" \ + --env "TORCHINDUCTOR_CACHE_DIR=${TORCHINDUCTOR_CACHE_DIR}" \ + --env "TORCH_EXTENSIONS_DIR=${TORCH_EXTENSIONS_DIR}" \ + --env "CUDA_CACHE_PATH=${CUDA_CACHE_PATH}" \ + --env "NUMBA_CACHE_DIR=${NUMBA_CACHE_DIR}" \ + --env "FLASHINFER_WORKSPACE_DIR=${FLASHINFER_WORKSPACE_DIR}" \ + "${TRAIN_SQSH}" \ + env \ + INSIDE_TRAIN_SQSH=1 \ + USE_TRAIN_SQSH=1 \ + REFERENCE_ROOT="${REFERENCE_ROOT}" \ + RUN_ID="${RUN_ID}" \ + MODEL_NAME="${MODEL_NAME}" \ + PI_MODEL_NAME="${PI_MODEL_NAME}" \ + HF_CHECKPOINT="${HF_CHECKPOINT}" \ + REF_LOAD="${REF_LOAD}" \ + SAVE_DIR="${SAVE_DIR}" \ + FRESH_START="${FRESH_START}" \ + RESUME_FROM_SAVE="${RESUME_FROM_SAVE}" \ + START_ROLLOUT_ID="${START_ROLLOUT_ID}" \ + FULL_PROMPT_DATA="${FULL_PROMPT_DATA}" \ + SMOKE_NUM_ROWS="${SMOKE_NUM_ROWS}" \ + RUN_DIR="${RUN_DIR}" \ + RUN_LOG_DIR="${RUN_LOG_DIR}" \ + ROLLOUT_SAVE_DIR="${ROLLOUT_SAVE_DIR}" \ + PROMPT_DATA="${PROMPT_DATA}" \ + TRAIN_SQSH="${TRAIN_SQSH}" \ + SLIME_DIR="${SLIME_DIR}" \ + MEGATRON_DIR="${MEGATRON_DIR}" \ + SHARED_SIF_DIR="${SHARED_SIF_DIR}" \ + APPTAINER_IMAGE_DIR="${APPTAINER_IMAGE_DIR}" \ + AGENT_CLI_DIR="${AGENT_CLI_DIR}" \ + PREPARE_AGENT_CLI="${PREPARE_AGENT_CLI}" \ + PREPARE_MISSING_SIFS="${PREPARE_MISSING_SIFS}" \ + TOTAL_GPUS="${TOTAL_GPUS}" \ + RAY_NUM_CPUS="${RAY_NUM_CPUS}" \ + TRAIN_NUM_GPUS="${TRAIN_NUM_GPUS}" \ + ACTOR_NUM_NODES="${ACTOR_NUM_NODES}" \ + ACTOR_NUM_GPUS_PER_NODE="${ACTOR_NUM_GPUS_PER_NODE}" \ + ROLLOUT_NUM_GPUS="${ROLLOUT_NUM_GPUS}" \ + ROLLOUT_NUM_GPUS_PER_ENGINE="${ROLLOUT_NUM_GPUS_PER_ENGINE}" \ + TENSOR_MODEL_PARALLEL_SIZE="${TENSOR_MODEL_PARALLEL_SIZE}" \ + ROLLOUT_BATCH_SIZE="${ROLLOUT_BATCH_SIZE}" \ + N_SAMPLES_PER_PROMPT="${N_SAMPLES_PER_PROMPT}" \ + NUM_ROLLOUT="${NUM_ROLLOUT}" \ + NUM_EPOCH="${NUM_EPOCH}" \ + NUM_STEPS_PER_ROLLOUT="${NUM_STEPS_PER_ROLLOUT}" \ + DISTRIBUTED_TIMEOUT_MINUTES="${DISTRIBUTED_TIMEOUT_MINUTES}" \ + SAVE_INTERVAL="${SAVE_INTERVAL}" \ + MAX_TOKENS_PER_GPU="${MAX_TOKENS_PER_GPU}" \ + ROLLOUT_MAX_RESPONSE_LEN="${ROLLOUT_MAX_RESPONSE_LEN}" \ + ROLLOUT_MAX_PROMPT_LEN="${ROLLOUT_MAX_PROMPT_LEN}" \ + SGLANG_CONTEXT_LENGTH="${SGLANG_CONTEXT_LENGTH}" \ + SGLANG_MEM_FRACTION_STATIC="${SGLANG_MEM_FRACTION_STATIC}" \ + SGLANG_LOG_LEVEL="${SGLANG_LOG_LEVEL}" \ + TRAIN_LR="${TRAIN_LR}" \ + CLIP_GRAD="${CLIP_GRAD}" \ + KL_LOSS_COEF="${KL_LOSS_COEF}" \ + EPS_CLIP="${EPS_CLIP}" \ + EPS_CLIP_HIGH="${EPS_CLIP_HIGH}" \ + POLAR_BUILDER_STRATEGY="${POLAR_BUILDER_STRATEGY}" \ + POLAR_MIN_COMPLETE_ACCEPT_FRACTION="${POLAR_MIN_COMPLETE_ACCEPT_FRACTION}" \ + POLAR_MAX_ASYNC_LEVEL="${POLAR_MAX_ASYNC_LEVEL}" \ + POLAR_RUNTIME_MEMORY_MB="${POLAR_RUNTIME_MEMORY_MB}" \ + POLAR_TASK_TIMEOUT_SECONDS="${POLAR_TASK_TIMEOUT_SECONDS}" \ + POLAR_REQUEST_TIMEOUT="${POLAR_REQUEST_TIMEOUT}" \ + PI_API_TYPE="${PI_API_TYPE}" \ + PI_CONTEXT_WINDOW="${PI_CONTEXT_WINDOW}" \ + PI_MAX_TOKENS="${PI_MAX_TOKENS}" \ + PI_THINKING="${PI_THINKING}" \ + PI_FAIL_ON_CONTEXT_LIMIT="${PI_FAIL_ON_CONTEXT_LIMIT}" \ + PI_COMPACTION_ENABLED="${PI_COMPACTION_ENABLED}" \ + PI_RETRY_ENABLED="${PI_RETRY_ENABLED}" \ + PI_RETRY_MAX_RETRIES="${PI_RETRY_MAX_RETRIES}" \ + PI_PROVIDER_MAX_RETRIES="${PI_PROVIDER_MAX_RETRIES}" \ + ROLLOUT_PORT="${ROLLOUT_PORT}" \ + GATEWAY_PORT="${GATEWAY_PORT}" \ + SGLANG_ROUTER_PORT="${SGLANG_ROUTER_PORT}" \ + SLIME_SGLANG_BASE_PORT="${SLIME_SGLANG_BASE_PORT}" \ + RAY_PORT="${RAY_PORT}" \ + RAY_DASHBOARD_PORT="${RAY_DASHBOARD_PORT}" \ + RAY_USE_EXISTING_CLUSTER="${RAY_USE_EXISTING_CLUSTER}" \ + RAY_STOP_ON_EXIT="${RAY_STOP_ON_EXIT}" \ + USE_RAY_JOB_SUBMIT="${USE_RAY_JOB_SUBMIT}" \ + RAY_JOB_ADDRESS="${RAY_JOB_ADDRESS}" \ + RAY_ADDRESS="${RAY_ADDRESS:-}" \ + RAY_TMPDIR="${RAY_TMPDIR}" \ + SGLANG_ROUTER_BASE_URL="${SGLANG_ROUTER_BASE_URL}" \ + POLAR_APPTAINER_BIN="${POLAR_APPTAINER_BIN}" \ + POLAR_APPTAINER_DIRECT_EXEC="${POLAR_APPTAINER_DIRECT_EXEC}" \ + APPTAINER_CACHEDIR="${APPTAINER_CACHEDIR}" \ + APPTAINER_TMPDIR="${APPTAINER_TMPDIR}" \ + APPTAINER_WORKDIR="${APPTAINER_WORKDIR}" \ + TRITON_CACHE_DIR="${TRITON_CACHE_DIR}" \ + TRITON_HOME="${TRITON_HOME}" \ + TORCHINDUCTOR_CACHE_DIR="${TORCHINDUCTOR_CACHE_DIR}" \ + TORCH_EXTENSIONS_DIR="${TORCH_EXTENSIONS_DIR}" \ + XDG_CACHE_HOME="${XDG_CACHE_HOME}" \ + XDG_CONFIG_HOME="${XDG_CONFIG_HOME}" \ + XDG_RUNTIME_DIR="${XDG_RUNTIME_DIR}" \ + CUDA_CACHE_PATH="${CUDA_CACHE_PATH}" \ + NUMBA_CACHE_DIR="${NUMBA_CACHE_DIR}" \ + FLASHINFER_WORKSPACE_DIR="${FLASHINFER_WORKSPACE_DIR}" \ + HOST_NVIDIA_LIB_DIR="${HOST_NVIDIA_LIB_DIR}" \ + PATCH_SLIME="${PATCH_SLIME}" \ + PATCH_SGLANG="${PATCH_SGLANG}" \ + PATCH_RAY_PY312="${PATCH_RAY_PY312}" \ + CONVERT_WEIGHTS="${CONVERT_WEIGHTS}" \ + SLIME_ZERO_NONFINITE_GRADS="${SLIME_ZERO_NONFINITE_GRADS}" \ + USE_WANDB="${USE_WANDB}" \ + WANDB_MODE="${WANDB_MODE}" \ + WANDB_PROJECT="${WANDB_PROJECT}" \ + WANDB_GROUP="${WANDB_GROUP}" \ + WANDB_RUN_ID="${WANDB_RUN_ID}" \ + WANDB_RANDOM_SUFFIX="${WANDB_RANDOM_SUFFIX}" \ + WANDB_TEAM="${WANDB_TEAM}" \ + WANDB_HOST="${WANDB_HOST}" \ + WANDB_API_KEY="${WANDB_API_KEY}" \ + WANDB_DIR="${WANDB_DIR}" \ + DRY_RUN="${DRY_RUN}" \ + HOME="${POLAR_JOB_CACHE_ROOT}/home" \ + PATH="${container_path}" \ + LD_LIBRARY_PATH="${container_ld}" \ + PYTHONNOUSERSITE=1 \ + /usr/bin/bash "$0" +} + +resolve_gpu_split +maybe_reexec_in_training_sqsh + +if [ -x /opt/polr_venv/bin/python ]; then + PYTHON_BIN="${PYTHON_BIN:-/opt/polr_venv/bin/python}" +else + PYTHON_BIN="${PYTHON_BIN:-${PROJECT_ROOT}/.venv/bin/python}" +fi +[ -x "${PYTHON_BIN}" ] || PYTHON_BIN="$(command -v python3 || command -v python)" +PYTHON_BIN_DIR="$(cd -- "$(dirname -- "${PYTHON_BIN}")" &>/dev/null && pwd)" + +mkdir -p "${RUN_DIR}" "${SAVE_DIR}" "${PROJECT_ROOT}/logs" "${WANDB_DIR}" \ + "${RUN_LOG_DIR}" "${ROLLOUT_SAVE_DIR}" "${RAY_TMPDIR}" "${APPTAINER_IMAGE_DIR}" \ + "${APPTAINER_CACHEDIR}" "${APPTAINER_TMPDIR}" "${APPTAINER_WORKDIR}" \ + "${TRITON_CACHE_DIR}" "${TRITON_HOME}" "${TORCHINDUCTOR_CACHE_DIR}" "${TORCH_EXTENSIONS_DIR}" \ + "${XDG_CACHE_HOME}" "${XDG_CONFIG_HOME}" "${XDG_RUNTIME_DIR}" "${CUDA_CACHE_PATH}" \ + "${NUMBA_CACHE_DIR}" "${FLASHINFER_WORKSPACE_DIR}" "${HOST_NVIDIA_LIB_DIR}" +chmod 700 "${XDG_RUNTIME_DIR}" || true +ulimit -n 1048576 >/dev/null 2>&1 || ulimit -n 65536 >/dev/null 2>&1 || true + +export PYTHONNOUSERSITE=1 +export PYTHONPATH="${MEGATRON_DIR}:${SLIME_DIR}:${PROJECT_ROOT}/src:${PYTHONPATH:-}" +export HF_CHECKPOINT MODEL_NAME PI_MODEL_NAME REF_LOAD SAVE_DIR +export POLAR_APPTAINER_BIN POLAR_APPTAINER_DIRECT_EXEC +export SLIME_ZERO_NONFINITE_GRADS SLIME_SGLANG_BASE_PORT +export APPTAINER_CACHEDIR APPTAINER_TMPDIR APPTAINER_WORKDIR +export SINGULARITY_CACHEDIR="${APPTAINER_CACHEDIR}" +export SINGULARITY_TMPDIR="${APPTAINER_TMPDIR}" +export TRITON_CACHE_DIR TRITON_HOME TORCHINDUCTOR_CACHE_DIR TORCH_EXTENSIONS_DIR +export XDG_CACHE_HOME XDG_CONFIG_HOME XDG_RUNTIME_DIR CUDA_CACHE_PATH NUMBA_CACHE_DIR FLASHINFER_WORKSPACE_DIR +export WANDB_MODE WANDB_PROJECT WANDB_GROUP WANDB_RUN_ID WANDB_DIR WANDB_API_KEY +export HOST_NVIDIA_LIB_DIR +if [ -f /host-nvidia-libs/libcuda.so.1 ]; then + export LD_PRELOAD="/host-nvidia-libs/libcuda.so.1${LD_PRELOAD:+:${LD_PRELOAD}}" +fi + +prepare_prompt_data_and_sifs() { + "${PYTHON_BIN}" - "${FULL_PROMPT_DATA}" "${PROMPT_DATA}" "${SMOKE_NUM_ROWS}" \ + "${APPTAINER_IMAGE_DIR}" "${SHARED_SIF_DIR}" "${PREPARE_MISSING_SIFS}" <<'PY' +import base64 +import json +import os +import subprocess +import sys +from pathlib import Path + +full_prompt = Path(sys.argv[1]) +prompt_out = Path(sys.argv[2]) +smoke_rows = int(sys.argv[3]) +image_dir = Path(sys.argv[4]) +shared_dir = Path(sys.argv[5]) +prepare_missing = sys.argv[6] == "1" + +script_dir = full_prompt.parent +sys.path.insert(0, str(script_dir)) +from sample_tasks import registry_image_for_instance_id # noqa: E402 + +rows = [json.loads(line) for line in full_prompt.read_text().splitlines() if line.strip()] +if smoke_rows > 0: + rows = rows[:smoke_rows] + +prompt_out.parent.mkdir(parents=True, exist_ok=True) +prompt_out.write_text("\n".join(json.dumps(row, ensure_ascii=True) for row in rows) + "\n") +image_dir.mkdir(parents=True, exist_ok=True) + +missing: list[tuple[str, Path]] = [] +for row in rows: + instance_id = str(row["metadata"]["instance_id"]) + target = image_dir / f"{instance_id}.sif" + if target.exists() or target.is_symlink(): + continue + image_ref = registry_image_for_instance_id(instance_id) + shared_name = image_ref.split(":", 1)[0].replace("/", "_") + ".sif" + source = shared_dir / shared_name + if source.is_file(): + target.symlink_to(source) + continue + missing.append((instance_id, source)) + +if missing and prepare_missing: + for instance_id, _ in missing: + subprocess.run( + [ + sys.executable, + str(script_dir / "prepare_apptainer_images.py"), + "--instance-id", + instance_id, + "--image-dir", + str(image_dir), + "--cache-dir", + os.environ.get("APPTAINER_CACHEDIR", "tmp/apptainer_cache"), + "--tmp-dir", + os.environ.get("APPTAINER_TMPDIR", "tmp/apptainer_tmp"), + "--skip-cli", + ], + check=True, + ) +elif missing: + print("Missing runtime SIF(s):", file=sys.stderr) + for instance_id, source in missing[:20]: + print(f" {instance_id}: expected shared source {source}", file=sys.stderr) + raise SystemExit( + "Set PREPARE_MISSING_SIFS=1 to pull missing runtime images, or provide SHARED_SIF_DIR." + ) + +print(f"Prompt data: {prompt_out} ({len(rows)} row(s))") +print(f"Runtime SIF dir: {image_dir}") +PY +} + +prepare_agent_cli_if_needed() { + if [ -x "${AGENT_CLI_DIR}/bin/pi" ] && [ -x "${AGENT_CLI_DIR}/bin/node" ]; then + log "Agent CLI dir ready: ${AGENT_CLI_DIR}" + return + fi + if [ "${PREPARE_AGENT_CLI}" != "1" ]; then + die "PI CLI missing at ${AGENT_CLI_DIR}; set AGENT_CLI_DIR to a prepared opt_node or PREPARE_AGENT_CLI=1" + fi + log "Preparing agent CLI dir: ${AGENT_CLI_DIR}" + "${PYTHON_BIN}" - "${SCRIPT_DIR}" "${AGENT_CLI_DIR}" <<'PY' +from pathlib import Path +import sys + +script_dir = Path(sys.argv[1]) +agent_cli_dir = Path(sys.argv[2]) +sys.path.insert(0, str(script_dir)) + +from prepare_apptainer_images import ensure_agent_cli_dir # noqa: E402 + +ensure_agent_cli_dir(agent_cli_dir, force=False) +PY +} + +render_runtime_configs() { + TOPOLOGY_PATH="${RUN_DIR}/topology.yaml" + CUSTOM_CONFIG_PATH="${RUN_DIR}/polar_config.yaml" + export TOPOLOGY_PATH CUSTOM_CONFIG_PATH + "${PYTHON_BIN}" - "${SCRIPT_DIR}/topology.yaml" "${TOPOLOGY_PATH}" \ + "${SCRIPT_DIR}/polar_config.yaml" "${CUSTOM_CONFIG_PATH}" <<'PY' +from pathlib import Path +import base64 +import json +import os +import shlex +import sys +import yaml + +topology_in, topology_out, config_in, config_out = sys.argv[1:] + +with open(topology_in, encoding="utf-8") as fh: + topology = yaml.safe_load(fh) or {} + +topology["rollout"]["port"] = int(os.environ["ROLLOUT_PORT"]) +topology["rollout"]["public_url"] = f"http://127.0.0.1:{os.environ['ROLLOUT_PORT']}" +topology["rollout"]["save_dir"] = os.environ["ROLLOUT_SAVE_DIR"] +nodes = topology.get("gateway", {}).get("nodes", []) +if not nodes: + raise SystemExit("topology has no gateway nodes") +nodes[:] = nodes[:1] +nodes[0]["id"] = "localhost-node-01" +nodes[0]["port"] = int(os.environ["GATEWAY_PORT"]) +nodes[0]["public_url"] = f"http://127.0.0.1:{os.environ['GATEWAY_PORT']}" +nodes[0]["model_served"] = os.environ["MODEL_NAME"] +nodes[0].setdefault("sglang", {})["base_url"] = os.environ["SGLANG_ROUTER_BASE_URL"] + +Path(topology_out).parent.mkdir(parents=True, exist_ok=True) +with open(topology_out, "w", encoding="utf-8") as fh: + yaml.safe_dump(topology, fh, sort_keys=False) + +with open(config_in, encoding="utf-8") as fh: + config = yaml.safe_load(fh) or {} + +config["polar_rollout_url"] = f"http://127.0.0.1:{os.environ['ROLLOUT_PORT']}" +config["polar_gateway_url"] = f"http://127.0.0.1:{os.environ['GATEWAY_PORT']}" +config["polar_agent_cli_dir"] = os.environ["AGENT_CLI_DIR"] +config["polar_apptainer_image_dir"] = os.environ["APPTAINER_IMAGE_DIR"] +config["polar_request_timeout"] = int(os.environ["POLAR_REQUEST_TIMEOUT"]) +config["polar_max_async_level"] = int(os.environ["POLAR_MAX_ASYNC_LEVEL"]) +config["polar_min_complete_accept_fraction"] = float(os.environ["POLAR_MIN_COMPLETE_ACCEPT_FRACTION"]) + +task = config.setdefault("polar_task_template", {}) +task["timeout_seconds"] = int(os.environ["POLAR_TASK_TIMEOUT_SECONDS"]) +runtime = task.setdefault("runtime", {}) +memory_mb = os.environ.get("POLAR_RUNTIME_MEMORY_MB", "").strip() +if memory_mb: + memory_value = int(memory_mb) + if memory_value <= 0: + raise SystemExit(f"POLAR_RUNTIME_MEMORY_MB must be positive when set: {memory_mb}") + runtime["memory_mb"] = memory_value +else: + runtime.pop("memory_mb", None) + + +def env_flag(name: str) -> bool: + return str(os.environ.get(name, "")).strip().lower() in {"1", "true", "yes", "on"} + + +pi_settings = { + "compaction": {"enabled": env_flag("PI_COMPACTION_ENABLED")}, + "retry": { + "enabled": env_flag("PI_RETRY_ENABLED"), + "maxRetries": int(os.environ["PI_RETRY_MAX_RETRIES"]), + "provider": {"maxRetries": int(os.environ["PI_PROVIDER_MAX_RETRIES"])}, + }, +} +pi_settings_json = json.dumps(pi_settings, separators=(",", ":")) +pi_settings_b64 = base64.b64encode(pi_settings_json.encode("utf-8")).decode("ascii") +prepare_steps = runtime.get("prepare") +if not isinstance(prepare_steps, list): + prepare_steps = [] + runtime["prepare"] = prepare_steps +prepare_steps.insert( + 0, + { + "type": "exec", + "command": ( + 'mkdir -p "$HOME/.pi/agent" && ' + f"printf '%s' {shlex.quote(pi_settings_b64)} | base64 -d > \"$HOME/.pi/agent/settings.json\"" + ), + }, +) + +agent = task.setdefault("agent", {}) +agent["harness"] = "pi" +agent["model_name"] = os.environ["PI_MODEL_NAME"] +settings = agent.setdefault("settings", {}) +settings["api_type"] = os.environ["PI_API_TYPE"] +settings["context_window"] = int(os.environ["PI_CONTEXT_WINDOW"]) +settings["max_tokens"] = int(os.environ["PI_MAX_TOKENS"]) +settings.setdefault("compat", {})["maxTokensField"] = "max_tokens" +if os.environ.get("PI_THINKING"): + settings["thinking"] = os.environ["PI_THINKING"] + +task.setdefault("builder", {})["strategy"] = os.environ["POLAR_BUILDER_STRATEGY"] + +Path(config_out).parent.mkdir(parents=True, exist_ok=True) +with open(config_out, "w", encoding="utf-8") as fh: + yaml.safe_dump(config, fh, sort_keys=False) + +print(f"Topology: {topology_out}") +print(f"Polar config: {config_out}") +PY +} + +patch_ray_py312_if_needed() { + "${PYTHON_BIN}" - <<'PY' +from pathlib import Path +import ray + +base = Path(ray.__file__).resolve().parent / "experimental" / "channel" +patched = [] +for name in ("communicator.py", "common.py"): + path = base / name + if not path.is_file(): + continue + text = path.read_text(encoding="utf-8") + if "import ray.actor\n" in text: + continue + if "import ray\n" not in text: + continue + path.write_text(text.replace("import ray\n", "import ray\nimport ray.actor\n", 1), encoding="utf-8") + patched.append(str(path)) +if patched: + print("Patched Ray Python 3.12 channel imports:") + for path in patched: + print(f" {path}") +else: + print("Ray Python 3.12 channel import patch already applied or not needed.") +PY +} + +preflight() { + [ -x "${PYTHON_BIN}" ] || die "python not executable: ${PYTHON_BIN}" + command -v ray >/dev/null 2>&1 || die "ray command not found" + command -v "${POLAR_APPTAINER_BIN}" >/dev/null 2>&1 || [ -x "${POLAR_APPTAINER_BIN}" ] || die "apptainer not found: ${POLAR_APPTAINER_BIN}" + if is_path_like "${HF_CHECKPOINT}"; then + [ -d "${HF_CHECKPOINT}" ] || die "HF checkpoint not found: ${HF_CHECKPOINT}" + fi + [ -f "${FULL_PROMPT_DATA}" ] || die "training data not found: ${FULL_PROMPT_DATA}" + [ -f "${SLIME_DIR}/train_async.py" ] || die "Slime checkout missing at ${SLIME_DIR}" + [ -d "${MEGATRON_DIR}/megatron" ] || die "Megatron-LM checkout missing at ${MEGATRON_DIR}" + + if [ "${PATCH_SLIME}" = "1" ]; then + bash "${PROJECT_ROOT}/scripts/patch/patch_slime.sh" "${SLIME_DIR}" + fi + if [ "${PATCH_SGLANG}" = "1" ]; then + bash "${PROJECT_ROOT}/scripts/patch/patch_sglang.sh" + fi + if [ "${PATCH_RAY_PY312}" = "1" ]; then + patch_ray_py312_if_needed + fi + + if ! checkpoint_marker_exists "${REF_LOAD}"; then + if [ "${CONVERT_WEIGHTS}" = "1" ]; then + log "Converting HF weights to Megatron torch_dist: ${REF_LOAD}" + TORCH_DIST_DIR="${REF_LOAD}" HF_CHECKPOINT="${HF_CHECKPOINT}" \ + SLIME_DIR="${SLIME_DIR}" MEGATRON_DIR="${MEGATRON_DIR}" \ + bash "${SCRIPT_DIR}/convert_weights.sh" + else + die "Megatron torch_dist checkpoint missing at ${REF_LOAD}; set REF_LOAD or CONVERT_WEIGHTS=1" + fi + fi + + "${PYTHON_BIN}" - <<'PY' +import importlib +mods = ["ray", "torch", "sglang", "polar", "yaml", "httpx"] +missing = [] +for mod in mods: + try: + importlib.import_module(mod) + except Exception as exc: + missing.append(f"{mod}: {type(exc).__name__}: {exc}") +if missing: + raise SystemExit("Missing Python dependency/dependencies:\n" + "\n".join(missing)) +PY + + if [ "${USE_WANDB}" = "1" ] && [ "${WANDB_MODE}" = "online" ] && [ -z "${WANDB_API_KEY:-}" ]; then + die "WANDB_API_KEY is required for online W&B" + fi +} + +runtime_ld_library_path() { + local py_site + py_site="$("${PYTHON_BIN}" - <<'PY' +import site +paths = site.getsitepackages() +print(paths[0] if paths else "") +PY +)" + printf '%s' "${py_site}/torch/lib:${py_site}/nvidia/cuda_runtime/lib:${py_site}/nvidia/cuda_nvrtc/lib:${py_site}/nvidia/nvjitlink/lib:${py_site}/nvidia/cublas/lib:${py_site}/nvidia/cudnn/lib:${py_site}/nvidia/nccl/lib:${py_site}/nvidia/cusparse/lib:${py_site}/nvidia/cusolver/lib:${py_site}/nvidia/cufft/lib:${py_site}/nvidia/curand/lib:${LD_LIBRARY_PATH:-}" +} + +SERVICE_PIDS=() + +start_services_and_train() { + SERVICE_PIDS=() + cleanup() { + set +e + log "Cleaning up Polar/Ray processes" + local pid + for pid in "${SERVICE_PIDS[@]}"; do + kill -TERM "$pid" 2>/dev/null || true + done + sleep 2 + for pid in "${SERVICE_PIDS[@]}"; do + if kill -0 "$pid" 2>/dev/null; then + kill -KILL "$pid" 2>/dev/null || true + fi + done + if [ "${RAY_STOP_ON_EXIT}" = "1" ]; then + ray stop --force >/dev/null 2>&1 || true + fi + wait 2>/dev/null || true + } + trap cleanup EXIT + + log "Starting Polar rollout on :${ROLLOUT_PORT}" + polar serve_rollout -c "${TOPOLOGY_PATH}" >"${RUN_LOG_DIR}/rollout.log" 2>&1 & + SERVICE_PIDS+=("$!") + sleep 2 + + log "Starting Polar gateway on :${GATEWAY_PORT}" + polar serve_gateway -c "${TOPOLOGY_PATH}" --node-id localhost-node-01 >"${RUN_LOG_DIR}/gateway.log" 2>&1 & + SERVICE_PIDS+=("$!") + sleep 2 + + "${PYTHON_BIN}" - "${ROLLOUT_PORT}" <<'PY' +import sys, time, urllib.request +url = f"http://127.0.0.1:{sys.argv[1]}/health" +for _ in range(60): + try: + with urllib.request.urlopen(url, timeout=2) as resp: + if resp.status == 200: + print("rollout healthy") + raise SystemExit(0) + except Exception: + time.sleep(2) +raise SystemExit(f"rollout health check failed: {url}") +PY + + if [ "${RAY_USE_EXISTING_CLUSTER}" = "1" ]; then + log "Using existing Ray cluster at ${RAY_ADDRESS:-auto}; job server ${RAY_JOB_ADDRESS}" + else + log "Starting Ray with ${TOTAL_GPUS} GPUs" + ray stop --force >"${RUN_LOG_DIR}/ray-stop-before-start.log" 2>&1 || true + if ! ray start --head --node-ip-address 127.0.0.1 \ + --port "${RAY_PORT}" \ + --num-cpus "${RAY_NUM_CPUS}" \ + --num-gpus "${TOTAL_GPUS}" \ + --dashboard-port "${RAY_DASHBOARD_PORT}" \ + --temp-dir "${RAY_TMPDIR}" \ + --disable-usage-stats >"${RUN_LOG_DIR}/ray-start.log" 2>&1; then + tail -100 "${RUN_LOG_DIR}/ray-start.log" >&2 || true + return 1 + fi + RAY_ADDRESS="${RAY_ADDRESS:-127.0.0.1:${RAY_PORT}}" + export RAY_ADDRESS + fi + + local runtime_ld + runtime_ld="$(runtime_ld_library_path)" + local runtime_env_path="${RUN_DIR}/ray_runtime_env.yaml" + "${PYTHON_BIN}" - "${runtime_env_path}" <&1 | tee "${RUN_LOG_DIR}/ray-job-submit.log" + else + "${PYTHON_BIN}" - "${train_args[@]}" <<'PY' 2>&1 | tee "${RUN_LOG_DIR}/train-direct.log" +import os +import runpy +import sys + +import ray + +script = sys.argv[1] +script_args = sys.argv[2:] +address = os.environ.get("RAY_ADDRESS") or "auto" +ray.init(address=address, ignore_reinit_error=True) +sys.argv = [script, *script_args] +runpy.run_path(script, run_name="__main__") +PY + fi + local submit_rc="${PIPESTATUS[0]}" + set -e + return "${submit_rc}" +} + +export ROLLOUT_PORT GATEWAY_PORT SGLANG_ROUTER_BASE_URL MODEL_NAME PI_MODEL_NAME +export AGENT_CLI_DIR APPTAINER_IMAGE_DIR ROLLOUT_SAVE_DIR +export POLAR_BUILDER_STRATEGY POLAR_MIN_COMPLETE_ACCEPT_FRACTION POLAR_MAX_ASYNC_LEVEL +export POLAR_RUNTIME_MEMORY_MB POLAR_TASK_TIMEOUT_SECONDS POLAR_REQUEST_TIMEOUT +export PI_API_TYPE PI_CONTEXT_WINDOW PI_MAX_TOKENS PI_THINKING +export PI_FAIL_ON_CONTEXT_LIMIT PI_COMPACTION_ENABLED PI_RETRY_ENABLED PI_RETRY_MAX_RETRIES PI_PROVIDER_MAX_RETRIES +export SLIME_SGLANG_BASE_PORT SGLANG_LOG_LEVEL +export RAY_PORT RAY_NUM_CPUS USE_RAY_JOB_SUBMIT + +log "Training set source: ${FULL_PROMPT_DATA}" +log "Smoke rows: ${SMOKE_NUM_ROWS} (set SMOKE_NUM_ROWS=0 for full train set)" +log "Agent harness: pi" +log "Builder: ${POLAR_BUILDER_STRATEGY}" +log "Model served: ${MODEL_NAME}" +log "PI model name: ${PI_MODEL_NAME}" +log "PI fail-fast: context_limit=${PI_FAIL_ON_CONTEXT_LIMIT}, compaction=${PI_COMPACTION_ENABLED}, retry=${PI_RETRY_ENABLED}, provider_retries=${PI_PROVIDER_MAX_RETRIES}" +log "HF checkpoint: ${HF_CHECKPOINT}" +log "Megatron load: ${REF_LOAD}" +log "Training sqsh: ${TRAIN_SQSH}" +log "Runtime SIF source dir: ${SHARED_SIF_DIR}" +log "Runtime SIF local dir: ${APPTAINER_IMAGE_DIR}" +log "Agent CLI dir: ${AGENT_CLI_DIR}" +log "GPU split: total=${TOTAL_GPUS}, train=${TRAIN_NUM_GPUS}, rollout=${ROLLOUT_NUM_GPUS}, tp=${TENSOR_MODEL_PARALLEL_SIZE}" +log "Ray CPU resources: ${RAY_NUM_CPUS}" +log "Batch: rollout=${ROLLOUT_BATCH_SIZE}, samples/prompt=${N_SAMPLES_PER_PROMPT}, steps/rollout=${NUM_STEPS_PER_ROLLOUT}" +log "Distributed timeout: ${DISTRIBUTED_TIMEOUT_MINUTES} minutes" +log "Run log dir: ${RUN_LOG_DIR}" +log "Rollout trace dir: ${ROLLOUT_SAVE_DIR}" +log "W&B: mode=${WANDB_MODE}, project=${WANDB_PROJECT}, group=${WANDB_GROUP}, run_id=${WANDB_RUN_ID}" +log "Ray driver mode: $([ "${USE_RAY_JOB_SUBMIT}" = "1" ] && printf 'job-submit' || printf 'direct')" + +preflight +prepare_prompt_data_and_sifs +prepare_agent_cli_if_needed +render_runtime_configs + +if [ "${DRY_RUN}" = "1" ]; then + log "DRY_RUN=1; preflight and config rendering complete." + exit 0 +fi + +start_services_and_train diff --git a/examples/swegym_slime_grpo/submit_pi_slurm_apptainer.sh b/examples/swegym_slime_grpo/submit_pi_slurm_apptainer.sh new file mode 100644 index 000000000..c935b319c --- /dev/null +++ b/examples/swegym_slime_grpo/submit_pi_slurm_apptainer.sh @@ -0,0 +1,705 @@ +#!/usr/bin/env bash +#SBATCH --job-name=polar-swegym-pi-qwen35 +#SBATCH --account=nvr_lpr_agentic +#SBATCH --partition=batch_block1 +#SBATCH --nodes=8 +#SBATCH --ntasks-per-node=1 +#SBATCH --gpus-per-node=8 +#SBATCH --time=4:00:00 +#SBATCH --dependency=singleton +#SBATCH --exclusive +#SBATCH --mem=0 +#SBATCH --output=/lustre/fs1/portfolios/llmservice/projects/llmservice_fm_vision/users/shaokunz/polar_3/ProRL-Agent-Server/logs/slurm/%x-%j.out +#SBATCH --error=/lustre/fs1/portfolios/llmservice/projects/llmservice_fm_vision/users/shaokunz/polar_3/ProRL-Agent-Server/logs/slurm/%x-%j.err +#SBATCH --export=ALL + +# Multi-node Slurm launcher for PI + Qwen3.5-4B SWE-Gym GRPO. +# The Slurm/Pyxis layer starts one Ray cluster across the allocation, then the +# head rank delegates PI-specific setup and training arguments to +# run_pi_interactive_apptainer.sh. +set -euo pipefail + +# ============================================================================= +# User settings +# ============================================================================= +PROJECT_ROOT="${PROJECT_ROOT:-/lustre/fs1/portfolios/llmservice/projects/llmservice_fm_vision/users/shaokunz/polar_3/ProRL-Agent-Server}" +REFERENCE_ROOT="${REFERENCE_ROOT:-/lustre/fs1/portfolios/llmservice/projects/llmservice_fm_vision/users/shaokunz/polar/ProRL-Agent-Server}" +SCRIPT_DIR="${PROJECT_ROOT}/examples/swegym_slime_grpo" + +MODEL_NAME="${MODEL_NAME:-Qwen/Qwen3.5-4B}" +PI_MODEL_NAME="${PI_MODEL_NAME:-openai/${MODEL_NAME}}" +HF_CHECKPOINT="${HF_CHECKPOINT:-/lustre/fs1/portfolios/llmservice/projects/llmservice_fm_vision/users/shaokunz/model/Qwen3.5-4B}" +REF_LOAD="${REF_LOAD:-${REFERENCE_ROOT}/tmp/checkpoints/Qwen3.5-4B_torch_dist}" +FULL_PROMPT_DATA="${FULL_PROMPT_DATA:-${SCRIPT_DIR}/swegym_train_293.jsonl}" + +# Keep account/partition in the SBATCH header unchanged. This image/mount style +# follows the cluster's existing Slurm/Pyxis launchers. +GPUS_PER_NODE="${GPUS_PER_NODE:-8}" +TRAIN_SQSH="${TRAIN_SQSH:-/lustre/fs1/portfolios/llmservice/projects/llmservice_fm_vision/users/shaokunz/docker/polr_swegym_slime_grpo_train_codex_yazi_v2.sqsh}" +TRAIN_CONTAINER_MOUNTS="${TRAIN_CONTAINER_MOUNTS:-/lustre/fs1:/lustre/fs1,/lustre/fsw:/lustre/fsw}" +SLIME_DIR="${SLIME_DIR:-${REFERENCE_ROOT}/slime}" +MEGATRON_DIR="${MEGATRON_DIR:-${REFERENCE_ROOT}/Megatron-LM}" +SHARED_SIF_DIR="${SHARED_SIF_DIR:-/lustre/fs1/portfolios/nvr/projects/nvr_lpr_agentic/users/haozh/singularity_images_v3}" +APPTAINER_IMAGE_DIR="${APPTAINER_IMAGE_DIR:-${PROJECT_ROOT}/tmp/swegym_apptainer_images}" +AGENT_CLI_DIR="${AGENT_CLI_DIR:-${REFERENCE_ROOT}/tmp/swegym_agent_cli/opt_node}" +HOST_NVIDIA_LIB_DIR="${HOST_NVIDIA_LIB_DIR:-${PROJECT_ROOT}/tmp/host-nvidia-libs}" +HF_CACHE_ROOT="${HF_CACHE_ROOT:-/lustre/fs1/portfolios/llmservice/projects/llmservice_fm_vision/users/shaokunz/HG_Cache}" +HF_HOME="${HF_HOME:-${HF_CACHE_ROOT}}" +HUGGINGFACE_HUB_CACHE="${HUGGINGFACE_HUB_CACHE:-${HF_CACHE_ROOT}/hub}" +HF_HUB_CACHE="${HF_HUB_CACHE:-${HUGGINGFACE_HUB_CACHE}}" +TRANSFORMERS_CACHE="${TRANSFORMERS_CACHE:-${HF_CACHE_ROOT}/transformers}" +HF_DATASETS_CACHE="${HF_DATASETS_CACHE:-${HF_CACHE_ROOT}/datasets}" +HF_MODULES_CACHE="${HF_MODULES_CACHE:-${HF_CACHE_ROOT}/modules}" +SENTENCE_TRANSFORMERS_HOME="${SENTENCE_TRANSFORMERS_HOME:-${HF_CACHE_ROOT}/sentence_transformers}" + +# Full run defaults. Direct `sbatch submit_pi_slurm_apptainer.sh` launches the +# production one-epoch run. For a short launch test, edit SMOKE_NUM_ROWS below +# before submitting. +SMOKE_NUM_ROWS="${SMOKE_NUM_ROWS:-0}" +NUM_EPOCH="${NUM_EPOCH:-1}" +NUM_ROLLOUT="${NUM_ROLLOUT:-}" +SAVE_INTERVAL="${SAVE_INTERVAL:-}" + +TENSOR_MODEL_PARALLEL_SIZE="${TENSOR_MODEL_PARALLEL_SIZE:-8}" +ACTOR_NUM_NODES="${ACTOR_NUM_NODES:-}" +TRAIN_NUM_GPUS="${TRAIN_NUM_GPUS:-}" +ACTOR_NUM_GPUS_PER_NODE="${ACTOR_NUM_GPUS_PER_NODE:-}" +ROLLOUT_NUM_GPUS="${ROLLOUT_NUM_GPUS:-}" +ROLLOUT_NUM_GPUS_PER_ENGINE="${ROLLOUT_NUM_GPUS_PER_ENGINE:-1}" + +ROLLOUT_BATCH_SIZE="${ROLLOUT_BATCH_SIZE:-}" +N_SAMPLES_PER_PROMPT="${N_SAMPLES_PER_PROMPT:-}" +NUM_STEPS_PER_ROLLOUT="${NUM_STEPS_PER_ROLLOUT:-1}" +DISTRIBUTED_TIMEOUT_MINUTES="${DISTRIBUTED_TIMEOUT_MINUTES:-180}" +MAX_TOKENS_PER_GPU="${MAX_TOKENS_PER_GPU:-60000}" +ROLLOUT_MAX_RESPONSE_LEN="${ROLLOUT_MAX_RESPONSE_LEN:-16000}" +ROLLOUT_MAX_PROMPT_LEN="${ROLLOUT_MAX_PROMPT_LEN:-32000}" +SGLANG_CONTEXT_LENGTH="${SGLANG_CONTEXT_LENGTH:-50000}" +SGLANG_MEM_FRACTION_STATIC="${SGLANG_MEM_FRACTION_STATIC:-0.8}" +SGLANG_LOG_LEVEL="${SGLANG_LOG_LEVEL:-warning}" + +POLAR_BUILDER_STRATEGY="${POLAR_BUILDER_STRATEGY:-prefix_merging}" +POLAR_MIN_COMPLETE_ACCEPT_FRACTION="${POLAR_MIN_COMPLETE_ACCEPT_FRACTION:-0.6}" +POLAR_MAX_ASYNC_LEVEL="${POLAR_MAX_ASYNC_LEVEL:-2}" +POLAR_RUNTIME_MEMORY_MB="${POLAR_RUNTIME_MEMORY_MB:-262144}" +POLAR_TASK_TIMEOUT_SECONDS="${POLAR_TASK_TIMEOUT_SECONDS:-1200}" +POLAR_REQUEST_TIMEOUT="${POLAR_REQUEST_TIMEOUT:-1200}" +PI_API_TYPE="${PI_API_TYPE:-openai-completions}" +PI_MAX_TOKENS="${PI_MAX_TOKENS:-512}" +PI_CONTEXT_WINDOW="${PI_CONTEXT_WINDOW:-32000}" +if [ "${PI_CONTEXT_WINDOW}" -le 0 ]; then + fail "PI_CONTEXT_WINDOW must be positive; got ${PI_CONTEXT_WINDOW}" +fi +PI_THINKING="${PI_THINKING:-}" +PI_FAIL_ON_CONTEXT_LIMIT="${PI_FAIL_ON_CONTEXT_LIMIT:-1}" +if [ "${PI_FAIL_ON_CONTEXT_LIMIT}" = "1" ]; then + PI_COMPACTION_ENABLED="${PI_COMPACTION_ENABLED:-false}" + PI_RETRY_ENABLED="${PI_RETRY_ENABLED:-false}" +else + PI_COMPACTION_ENABLED="${PI_COMPACTION_ENABLED:-true}" + PI_RETRY_ENABLED="${PI_RETRY_ENABLED:-true}" +fi +PI_RETRY_MAX_RETRIES="${PI_RETRY_MAX_RETRIES:-0}" +PI_PROVIDER_MAX_RETRIES="${PI_PROVIDER_MAX_RETRIES:-0}" + +# Stability knobs for the PI run. Keep the batch, samples, timeout, context, +# and max-token settings fixed; these three values are the intended stability +# change after the observed late-run reward collapse. +TRAIN_LR="${TRAIN_LR:-5e-7}" +CLIP_GRAD="${CLIP_GRAD:-0.5}" +KL_LOSS_COEF="${KL_LOSS_COEF:-0.005}" +EPS_CLIP="${EPS_CLIP:-0.2}" +EPS_CLIP_HIGH="${EPS_CLIP_HIGH:-0.28}" +SLIME_ZERO_NONFINITE_GRADS="${SLIME_ZERO_NONFINITE_GRADS:-1}" +PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF:-max_split_size_mb:2048,expandable_segments:True}" +PYTORCH_ALLOC_CONF="${PYTORCH_ALLOC_CONF:-${PYTORCH_CUDA_ALLOC_CONF}}" +RAY_MEMORY_USAGE_THRESHOLD="${RAY_MEMORY_USAGE_THRESHOLD:-0.99}" +RAY_memory_usage_threshold="${RAY_memory_usage_threshold:-${RAY_MEMORY_USAGE_THRESHOLD}}" +RAY_NUM_CPUS_PER_NODE="${RAY_NUM_CPUS_PER_NODE:-32}" +GPU_MONITOR_INTERVAL="${GPU_MONITOR_INTERVAL:-30}" +GPU_WANDB_MONITOR="${GPU_WANDB_MONITOR:-1}" +GPU_WANDB_MONITOR_INTERVAL="${GPU_WANDB_MONITOR_INTERVAL:-${GPU_MONITOR_INTERVAL}}" + +RAY_PORT="${RAY_PORT:-6379}" +RAY_DASHBOARD_PORT="${RAY_DASHBOARD_PORT:-28265}" +ROLLOUT_PORT="${ROLLOUT_PORT:-18080}" +GATEWAY_PORT="${GATEWAY_PORT:-18100}" +SGLANG_ROUTER_PORT="${SGLANG_ROUTER_PORT:-26000}" +SLIME_SGLANG_BASE_PORT="${SLIME_SGLANG_BASE_PORT:-34000}" + +USE_WANDB="${USE_WANDB:-1}" +WANDB_MODE="${WANDB_MODE:-online}" +WANDB_ENTITY="${WANDB_ENTITY:-NVR-LRP}" +WANDB_PROJECT="${WANDB_PROJECT:-polar-swegym-pi-qwen35-4b}" +WANDB_RANDOM_SUFFIX="${WANDB_RANDOM_SUFFIX:-0}" +WANDB_TEAM="${WANDB_TEAM:-${WANDB_ENTITY}}" +WANDB_HOST="${WANDB_HOST:-}" +WANDB_API_KEY="${WANDB_API_KEY:-${WANDB_KEY:-}}" + +PATCH_SLIME="${PATCH_SLIME:-1}" +PATCH_SGLANG="${PATCH_SGLANG:-1}" +PATCH_RAY_PY312="${PATCH_RAY_PY312:-1}" +FRESH_START="${FRESH_START:-auto}" +RESUME_FROM_SAVE="${RESUME_FROM_SAVE:-auto}" +AUTO_RESUME_FROM_SAVE="${AUTO_RESUME_FROM_SAVE:-1}" +START_ROLLOUT_ID="${START_ROLLOUT_ID:-}" +DRY_RUN="${DRY_RUN:-0}" + +# ============================================================================= +# Implementation +# ============================================================================= +LOG_DIR="${PROJECT_ROOT}/logs/slurm" + +die() { + printf 'ERROR: %s\n' "$*" >&2 + exit 1 +} + +is_positive_int() { + case "$1" in + ''|*[!0-9]*) return 1 ;; + 0) return 1 ;; + *) return 0 ;; + esac +} + +[ -d "${PROJECT_ROOT}" ] || die "PROJECT_ROOT does not exist: ${PROJECT_ROOT}" +[ -f "${SCRIPT_DIR}/run_pi_interactive_apptainer.sh" ] || die "missing PI launcher: ${SCRIPT_DIR}/run_pi_interactive_apptainer.sh" +[ -f "${TRAIN_SQSH}" ] || die "TRAIN_SQSH does not exist: ${TRAIN_SQSH}" +[ -d "${HF_CHECKPOINT}" ] || die "HF_CHECKPOINT does not exist: ${HF_CHECKPOINT}" +[ -f "${REF_LOAD}/latest_checkpointed_iteration.txt" ] || die "Megatron checkpoint missing latest marker: ${REF_LOAD}" +[ -f "${FULL_PROMPT_DATA}" ] || die "training data missing: ${FULL_PROMPT_DATA}" +[ -f "${SLIME_DIR}/train_async.py" ] || die "Slime checkout missing: ${SLIME_DIR}" +[ -d "${MEGATRON_DIR}/megatron" ] || die "Megatron-LM checkout missing: ${MEGATRON_DIR}" +[ -x "${AGENT_CLI_DIR}/bin/pi" ] || die "PI CLI missing: ${AGENT_CLI_DIR}/bin/pi" + +NUM_NODES="${SLURM_JOB_NUM_NODES:-${SLURM_NNODES:-}}" +[ -n "${NUM_NODES}" ] || die "This script must run inside an sbatch allocation" +case "${NUM_NODES}" in + 1|2|3|4|5|6|7|8) ;; + *) die "NUM_NODES must be between 1 and 8 for this launcher; got ${NUM_NODES}" ;; +esac + +TOTAL_GPUS="$((NUM_NODES * GPUS_PER_NODE))" +if [ -z "${ACTOR_NUM_NODES}" ]; then + if [ "${NUM_NODES}" -ge 8 ]; then + ACTOR_NUM_NODES=4 + else + ACTOR_NUM_NODES=1 + fi +fi +[ "${ACTOR_NUM_NODES}" -le "${NUM_NODES}" ] || die "ACTOR_NUM_NODES exceeds allocation nodes" + +if [ -z "${TRAIN_NUM_GPUS}" ] && [ -z "${ACTOR_NUM_GPUS_PER_NODE}" ]; then + TRAIN_NUM_GPUS="$((ACTOR_NUM_NODES * GPUS_PER_NODE))" +fi +if [ -z "${ACTOR_NUM_GPUS_PER_NODE}" ]; then + [ "$((TRAIN_NUM_GPUS % ACTOR_NUM_NODES))" -eq 0 ] || \ + die "TRAIN_NUM_GPUS (${TRAIN_NUM_GPUS}) must be divisible by ACTOR_NUM_NODES (${ACTOR_NUM_NODES})" + ACTOR_NUM_GPUS_PER_NODE="$((TRAIN_NUM_GPUS / ACTOR_NUM_NODES))" +else + TRAIN_NUM_GPUS="$((ACTOR_NUM_NODES * ACTOR_NUM_GPUS_PER_NODE))" +fi +[ "${ACTOR_NUM_GPUS_PER_NODE}" -le "${GPUS_PER_NODE}" ] || die "ACTOR_NUM_GPUS_PER_NODE exceeds GPUS_PER_NODE" + +if [ -z "${ROLLOUT_NUM_GPUS}" ]; then + ROLLOUT_NUM_GPUS="$((TOTAL_GPUS - TRAIN_NUM_GPUS))" +fi +[ "${TRAIN_NUM_GPUS}" -gt 0 ] || die "TRAIN_NUM_GPUS must be positive" +[ "${ROLLOUT_NUM_GPUS}" -gt 0 ] || die "ROLLOUT_NUM_GPUS must be positive" +[ "$((TRAIN_NUM_GPUS + ROLLOUT_NUM_GPUS))" -le "${TOTAL_GPUS}" ] || \ + die "train + rollout GPUs exceed allocation: train=${TRAIN_NUM_GPUS}, rollout=${ROLLOUT_NUM_GPUS}, total=${TOTAL_GPUS}" +[ "$((TRAIN_NUM_GPUS % TENSOR_MODEL_PARALLEL_SIZE))" -eq 0 ] || \ + die "TRAIN_NUM_GPUS (${TRAIN_NUM_GPUS}) must be divisible by TENSOR_MODEL_PARALLEL_SIZE (${TENSOR_MODEL_PARALLEL_SIZE})" + +DATA_PARALLEL_SIZE="$((TRAIN_NUM_GPUS / TENSOR_MODEL_PARALLEL_SIZE))" +if [ "${SMOKE_NUM_ROWS}" -gt 0 ]; then + ROLLOUT_BATCH_SIZE="${ROLLOUT_BATCH_SIZE:-1}" + N_SAMPLES_PER_PROMPT="${N_SAMPLES_PER_PROMPT:-${DATA_PARALLEL_SIZE}}" + NUM_ROLLOUT="${NUM_ROLLOUT:-1}" + SAVE_INTERVAL="${SAVE_INTERVAL:-1}" +else + ROLLOUT_BATCH_SIZE="${ROLLOUT_BATCH_SIZE:-4}" + N_SAMPLES_PER_PROMPT="${N_SAMPLES_PER_PROMPT:-16}" + SAVE_INTERVAL="${SAVE_INTERVAL:-1}" +fi + +ROLLOUT_SAMPLES_PER_STEP="$((ROLLOUT_BATCH_SIZE * N_SAMPLES_PER_PROMPT))" +[ "$((ROLLOUT_SAMPLES_PER_STEP % NUM_STEPS_PER_ROLLOUT))" -eq 0 ] || \ + die "rollout_batch_size * n_samples_per_prompt must be divisible by num_steps_per_rollout" +GLOBAL_BATCH_SIZE="$((ROLLOUT_SAMPLES_PER_STEP / NUM_STEPS_PER_ROLLOUT))" +[ "$((GLOBAL_BATCH_SIZE % DATA_PARALLEL_SIZE))" -eq 0 ] || \ + die "global batch size (${GLOBAL_BATCH_SIZE}) must be divisible by data parallel size (${DATA_PARALLEL_SIZE})" + +RUN_KIND="${RUN_KIND:-full293}" +if [ "${SMOKE_NUM_ROWS}" -gt 0 ]; then + RUN_KIND="debug${SMOKE_NUM_ROWS}" +fi +EXPERIMENT_TAG="${EXPERIMENT_TAG:-${RUN_KIND}_1ep_${NUM_NODES}n${TOTAL_GPUS}g_train${TRAIN_NUM_GPUS}_rollout${ROLLOUT_NUM_GPUS}_tp${TENSOR_MODEL_PARALLEL_SIZE}_dp${DATA_PARALLEL_SIZE}_rb${ROLLOUT_BATCH_SIZE}_n${N_SAMPLES_PER_PROMPT}}" +STABILITY_TAG="${STABILITY_TAG:-lr5e7_kl5e3_clip05}" +RUN_SERIES_ID="${RUN_SERIES_ID:-swegym_pi_qwen35_4b_${EXPERIMENT_TAG}_${STABILITY_TAG}_t1200_failctxb64}" +RUN_ID="${RUN_ID:-${RUN_SERIES_ID}}" +RUN_DIR="${RUN_DIR:-${PROJECT_ROOT}/tmp/${RUN_ID}}" +RUN_LOG_DIR="${RUN_LOG_DIR:-${RUN_DIR}/logs}" +ROLLOUT_SAVE_DIR="${ROLLOUT_SAVE_DIR:-${RUN_DIR}/rollout_results}" +SAVE_DIR="${SAVE_DIR:-${PROJECT_ROOT}/tmp/ckpt/${RUN_ID}}" +WANDB_GROUP="${WANDB_GROUP:-swegym-pi-qwen35-4b-${RUN_KIND}-${NUM_NODES}n${TOTAL_GPUS}g}" +WANDB_RUN_ID="${WANDB_RUN_ID:-${RUN_ID}}" +WANDB_DIR="${WANDB_DIR:-${PROJECT_ROOT}/logs/wandb}" +if [ -n "${NUM_ROLLOUT}" ]; then + is_positive_int "${NUM_ROLLOUT}" || die "NUM_ROLLOUT must be a positive integer when set: ${NUM_ROLLOUT}" + EXPECTED_ROLLOUTS="${NUM_ROLLOUT}" +else + is_positive_int "${NUM_EPOCH}" || die "NUM_EPOCH must be a positive integer: ${NUM_EPOCH}" + if [ "${SMOKE_NUM_ROWS}" -gt 0 ]; then + PROMPT_ROW_COUNT="${SMOKE_NUM_ROWS}" + else + PROMPT_ROW_COUNT="$(wc -l < "${FULL_PROMPT_DATA}" | tr -d '[:space:]')" + is_positive_int "${PROMPT_ROW_COUNT}" || die "could not count prompt rows in ${FULL_PROMPT_DATA}" + fi + EXPECTED_ROLLOUTS="$(( ((PROMPT_ROW_COUNT + ROLLOUT_BATCH_SIZE - 1) / ROLLOUT_BATCH_SIZE) * NUM_EPOCH ))" +fi + +if [ "${AUTO_RESUME_FROM_SAVE}" = "1" ]; then + if [ -f "${SAVE_DIR}/latest_checkpointed_iteration.txt" ]; then + FRESH_START=0 + RESUME_FROM_SAVE=1 + if [ -z "${START_ROLLOUT_ID}" ]; then + latest_checkpoint_iter="$(tr -d '\n[:space:]' < "${SAVE_DIR}/latest_checkpointed_iteration.txt")" + case "${latest_checkpoint_iter}" in + ''|*[!0-9]*) die "invalid latest checkpoint marker for START_ROLLOUT_ID: ${latest_checkpoint_iter}" ;; + esac + START_ROLLOUT_ID="$((latest_checkpoint_iter + 1))" + fi + else + FRESH_START=1 + RESUME_FROM_SAVE=0 + fi +else + if [ "${RESUME_FROM_SAVE}" = "auto" ]; then + RESUME_FROM_SAVE=0 + fi + if [ "${FRESH_START}" = "auto" ]; then + if [ "${RESUME_FROM_SAVE}" = "1" ]; then + FRESH_START=0 + else + FRESH_START=1 + fi + fi +fi +[ "${FRESH_START}" = "1" ] && [ "${RESUME_FROM_SAVE}" = "1" ] && \ + die "FRESH_START=1 conflicts with RESUME_FROM_SAVE=1" +if [ "${FRESH_START}" = "1" ] && [ -f "${SAVE_DIR}/latest_checkpointed_iteration.txt" ]; then + die "FRESH_START=1 refuses existing checkpoint in SAVE_DIR: ${SAVE_DIR}" +fi +if [ "${RESUME_FROM_SAVE}" = "1" ] && [ -n "${START_ROLLOUT_ID}" ] && \ + [ "${START_ROLLOUT_ID}" -ge "${EXPECTED_ROLLOUTS}" ]; then + cat < ${quarantine_dir}" + mv "${next_iter_dir}" "${quarantine_dir}" + fi + next_rollout_state=$(printf "%s/rollout/global_dataset_state_dict_%d.pt" "${SAVE_DIR}" "${START_ROLLOUT_ID}") + if [ -f "${next_rollout_state}" ]; then + quarantine_state="${next_rollout_state}.incomplete_${SLURM_JOB_ID:-manual}_$(date +%Y%m%d_%H%M%S)" + echo "[$(date -u +%Y-%m-%dT%H:%M:%SZ)] Quarantining incomplete rollout state ${next_rollout_state} -> ${quarantine_state}" + mv "${next_rollout_state}" "${quarantine_state}" + fi +fi + +mkdir -p "${LOG_DIR}" "${RUN_LOG_DIR}" "${RUN_DIR}" "${SAVE_DIR}" "${WANDB_DIR}" \ + "${HF_HOME}" "${HUGGINGFACE_HUB_CACHE}" "${HF_HUB_CACHE}" "${TRANSFORMERS_CACHE}" \ + "${HF_DATASETS_CACHE}" "${HF_MODULES_CACHE}" "${SENTENCE_TRANSFORMERS_HOME}" +STOP_FILE="${RUN_DIR}/slurm_stop_workers" +WORKER_SCRIPT="${RUN_DIR}/slurm_ray_worker_pi.sh" +rm -f "${STOP_FILE}" + +mapfile -t SLURM_NODES < <(scontrol show hostnames "${SLURM_NODELIST}") +HEAD_NODE="${SLURM_NODES[0]}" +RAY_HEAD_IP="$(srun --overlap --nodes=1 --ntasks=1 --ntasks-per-node=1 -w "${HEAD_NODE}" bash -lc 'hostname -I | cut -d" " -f1')" +RAY_ADDRESS="${RAY_HEAD_IP}:${RAY_PORT}" +RAY_JOB_ADDRESS="http://${RAY_HEAD_IP}:${RAY_DASHBOARD_PORT}" +RAY_NUM_CPUS="$((NUM_NODES * RAY_NUM_CPUS_PER_NODE))" +UNUSED_GPUS="$((TOTAL_GPUS - TRAIN_NUM_GPUS - ROLLOUT_NUM_GPUS))" + +cat >"${WORKER_SCRIPT}" <<'WORKER' +#!/usr/bin/env bash +set -euo pipefail + +rank="${SLURM_PROCID}" +node="$(hostname)" +node_ip="$(hostname -I | awk '{print $1}')" + +export PATH="/opt/polr_venv/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin" +export PYTHONNOUSERSITE=1 +export PYTHONPATH="${MEGATRON_DIR}:${SLIME_DIR}:${PROJECT_ROOT}/src:${PYTHONPATH:-}" +export HF_HOME="${HF_HOME}" +export HUGGINGFACE_HUB_CACHE="${HUGGINGFACE_HUB_CACHE}" +export HF_HUB_CACHE="${HF_HUB_CACHE}" +export TRANSFORMERS_CACHE="${TRANSFORMERS_CACHE}" +export HF_DATASETS_CACHE="${HF_DATASETS_CACHE}" +export HF_MODULES_CACHE="${HF_MODULES_CACHE}" +export SENTENCE_TRANSFORMERS_HOME="${SENTENCE_TRANSFORMERS_HOME}" + +PY_SITE="/opt/polr_venv/lib/python3.12/site-packages" +export LD_LIBRARY_PATH="${PY_SITE}/torch/lib:${PY_SITE}/nvidia/cuda_runtime/lib:${PY_SITE}/nvidia/cuda_nvrtc/lib:${PY_SITE}/nvidia/nvjitlink/lib:${PY_SITE}/nvidia/cublas/lib:${PY_SITE}/nvidia/cudnn/lib:${PY_SITE}/nvidia/nccl/lib:${PY_SITE}/nvidia/cusparse/lib:${PY_SITE}/nvidia/cusolver/lib:${PY_SITE}/nvidia/cufft/lib:${PY_SITE}/nvidia/curand/lib:${LD_LIBRARY_PATH:-}" +export PYTORCH_CUDA_ALLOC_CONF="${PYTORCH_CUDA_ALLOC_CONF}" +export PYTORCH_ALLOC_CONF="${PYTORCH_ALLOC_CONF}" +export RAY_memory_usage_threshold="${RAY_memory_usage_threshold:-${RAY_MEMORY_USAGE_THRESHOLD}}" +export RAY_MEMORY_USAGE_THRESHOLD="${RAY_MEMORY_USAGE_THRESHOLD}" + +NODE_CACHE_ROOT="/tmp/polar-swegym-pi-${SLURM_JOB_ID}-${rank}" +export HOME="${NODE_CACHE_ROOT}/home" +export APPTAINER_CACHEDIR="${NODE_CACHE_ROOT}/apptainer-cache" +export APPTAINER_TMPDIR="${NODE_CACHE_ROOT}/apptainer-tmp" +export APPTAINER_WORKDIR="${NODE_CACHE_ROOT}/apptainer-work" +export SINGULARITY_CACHEDIR="${APPTAINER_CACHEDIR}" +export SINGULARITY_TMPDIR="${APPTAINER_TMPDIR}" +export TRITON_CACHE_DIR="${NODE_CACHE_ROOT}/triton-cache" +export TRITON_HOME="${NODE_CACHE_ROOT}/triton-home" +export TORCHINDUCTOR_CACHE_DIR="${NODE_CACHE_ROOT}/torchinductor" +export TORCH_EXTENSIONS_DIR="${NODE_CACHE_ROOT}/torch-extensions" +export XDG_CACHE_HOME="${NODE_CACHE_ROOT}/xdg-cache" +export XDG_CONFIG_HOME="${NODE_CACHE_ROOT}/xdg-config" +export XDG_RUNTIME_DIR="${NODE_CACHE_ROOT}/xdg-runtime" +export CUDA_CACHE_PATH="${NODE_CACHE_ROOT}/cuda-cache" +export NUMBA_CACHE_DIR="${NODE_CACHE_ROOT}/numba" +export FLASHINFER_WORKSPACE_DIR="${NODE_CACHE_ROOT}/flashinfer-cache" +export RAY_TMPDIR="/tmp/polar-ray-pi-${SLURM_JOB_ID}-${rank}" + +mkdir -p "${HOME}" "${APPTAINER_CACHEDIR}" "${APPTAINER_TMPDIR}" "${APPTAINER_WORKDIR}" \ + "${TRITON_CACHE_DIR}" "${TRITON_HOME}" "${TORCHINDUCTOR_CACHE_DIR}" "${TORCH_EXTENSIONS_DIR}" \ + "${XDG_CACHE_HOME}" "${XDG_CONFIG_HOME}" "${XDG_RUNTIME_DIR}" "${CUDA_CACHE_PATH}" \ + "${NUMBA_CACHE_DIR}" "${FLASHINFER_WORKSPACE_DIR}" "${RAY_TMPDIR}" \ + "${HF_HOME}" "${HUGGINGFACE_HUB_CACHE}" "${HF_HUB_CACHE}" "${TRANSFORMERS_CACHE}" \ + "${HF_DATASETS_CACHE}" "${HF_MODULES_CACHE}" "${SENTENCE_TRANSFORMERS_HOME}" +chmod 700 "${HOME}" "${XDG_RUNTIME_DIR}" || true + +ray stop --force >/dev/null 2>&1 || true + +gpu_monitor_pid="" +gpu_wandb_monitor_pid="" + +gpu_id_csv() { + local start="$1" + local end="$2" + local out="" + local i + if [ "${end}" -lt "${start}" ]; then + printf '' + return + fi + for ((i = start; i <= end; i++)); do + if [ -n "${out}" ]; then + out+="," + fi + out+="${i}" + done + printf '%s' "${out}" +} + +start_gpu_monitor() { + if ! command -v nvidia-smi >/dev/null 2>&1; then + return + fi + local gpu_log="${RUN_LOG_DIR}/gpu-util-rank-${rank}.csv" + local gpu_err="${RUN_LOG_DIR}/gpu-util-rank-${rank}.err" + ( + if [ ! -s "${gpu_log}" ]; then + printf 'sample_time,host,rank,index,utilization_gpu_pct,utilization_memory_pct,memory_used_mb,memory_total_mb,power_draw_w\n' + fi + while true; do + sample_time="$(date +'%F %T')" + nvidia-smi --query-gpu=index,utilization.gpu,utilization.memory,memory.used,memory.total,power.draw \ + --format=csv,noheader,nounits | + while IFS= read -r line; do + printf '%s,%s,%s,%s\n' "${sample_time}" "${node}" "${rank}" "${line}" + done + sleep "${GPU_MONITOR_INTERVAL}" + done + ) >>"${gpu_log}" 2>>"${gpu_err}" & + gpu_monitor_pid="$!" +} + +start_wandb_gpu_monitor() { + if [ "${GPU_WANDB_MONITOR}" != "1" ]; then + return + fi + if [ ! -f "${PROJECT_ROOT}/scripts/monitor_wandb_gpu.py" ]; then + echo "GPU W&B monitor script missing: ${PROJECT_ROOT}/scripts/monitor_wandb_gpu.py" >&2 + return + fi + if ! command -v python3 >/dev/null 2>&1; then + echo "python3 missing; skipping GPU W&B monitor" >&2 + return + fi + + local train_gpus="" + local rollout_gpus="" + if [ "${rank}" -lt "${ACTOR_NUM_NODES}" ]; then + train_gpus="$(gpu_id_csv 0 "$((ACTOR_NUM_GPUS_PER_NODE - 1))")" + if [ "${ACTOR_NUM_GPUS_PER_NODE}" -lt "${GPUS_PER_NODE}" ]; then + rollout_gpus="$(gpu_id_csv "${ACTOR_NUM_GPUS_PER_NODE}" "$((GPUS_PER_NODE - 1))")" + fi + else + rollout_gpus="$(gpu_id_csv 0 "$((GPUS_PER_NODE - 1))")" + fi + + local monitor_log="${RUN_LOG_DIR}/wandb-gpu-monitor-rank-${rank}.log" + local monitor_csv="${RUN_LOG_DIR}/wandb-gpu-rank-${rank}.csv" + local wandb_args=() + if [ "${USE_WANDB}" = "1" ]; then + wandb_args=( + --wandb-project "${WANDB_PROJECT}" + --wandb-entity "${WANDB_ENTITY}" + --wandb-run-id "${WANDB_RUN_ID}" + --wandb-dir "${WANDB_DIR}" + --wandb-label "gpu-monitor-rank-${rank}" + --wandb-mode shared + ) + else + wandb_args=(--no-wandb) + fi + + ( + python3 "${PROJECT_ROOT}/scripts/monitor_wandb_gpu.py" \ + --interval-s "${GPU_WANDB_MONITOR_INTERVAL}" \ + --out-csv "${monitor_csv}" \ + --train-gpus "${train_gpus}" \ + --rollout-gpus "${rollout_gpus}" \ + --metric-prefix "polar_system/rank_${rank}" \ + "${wandb_args[@]}" + ) >"${monitor_log}" 2>&1 & + gpu_wandb_monitor_pid="$!" +} + +stop_gpu_monitor() { + if [ -n "${gpu_monitor_pid}" ]; then + kill "${gpu_monitor_pid}" 2>/dev/null || true + wait "${gpu_monitor_pid}" 2>/dev/null || true + gpu_monitor_pid="" + fi +} + +stop_wandb_gpu_monitor() { + if [ -n "${gpu_wandb_monitor_pid}" ]; then + kill "${gpu_wandb_monitor_pid}" 2>/dev/null || true + wait "${gpu_wandb_monitor_pid}" 2>/dev/null || true + gpu_wandb_monitor_pid="" + fi +} + +cleanup_ray() { + stop_wandb_gpu_monitor + stop_gpu_monitor + ray stop --force >/dev/null 2>&1 || true +} +trap cleanup_ray EXIT + +patch_node_runtime() { + if [ "${PATCH_SGLANG}" = "1" ]; then + local patch_log="${RUN_LOG_DIR}/patch-sglang-rank-${rank}.log" + echo "[$(date +'%F %T')] Patching SGLang inside rank ${rank} container on ${node}" + if ! bash "${PROJECT_ROOT}/scripts/patch/patch_sglang.sh" >"${patch_log}" 2>&1; then + echo "SGLang patch failed on rank ${rank}; see ${patch_log}" >&2 + tail -100 "${patch_log}" >&2 || true + exit 1 + fi + fi +} + +patch_node_runtime +start_gpu_monitor +start_wandb_gpu_monitor + +if [ "${rank}" = "0" ]; then + echo "[$(date +'%F %T')] Starting Ray head on ${node} (${RAY_HEAD_IP})" + ray start --head \ + --node-ip-address="${RAY_HEAD_IP}" \ + --port="${RAY_PORT}" \ + --dashboard-host=0.0.0.0 \ + --dashboard-port="${RAY_DASHBOARD_PORT}" \ + --num-cpus="${RAY_NUM_CPUS_PER_NODE}" \ + --num-gpus="${GPUS_PER_NODE}" \ + --temp-dir="${RAY_TMPDIR}" \ + --disable-usage-stats >"${RUN_LOG_DIR}/ray-head.log" 2>&1 + + python - <<'PY' +import os +import time + +import ray + +expected = float(os.environ["TOTAL_GPUS"]) +ray.init(address=os.environ["RAY_ADDRESS"]) +for _ in range(120): + resources = ray.cluster_resources() + got = float(resources.get("GPU", 0)) + print(f"Ray GPUs visible: {got}/{expected}", flush=True) + if got >= expected: + break + time.sleep(5) +else: + raise SystemExit(f"Ray cluster did not reach {expected} GPUs") +ray.shutdown() +PY + + finish_workers() { + touch "${STOP_FILE}" + ray stop --force >/dev/null 2>&1 || true + } + trap finish_workers EXIT + + export INSIDE_TRAIN_SQSH=1 + export USE_TRAIN_SQSH=1 + export RAY_USE_EXISTING_CLUSTER=1 + export RAY_STOP_ON_EXIT=0 + export USE_RAY_JOB_SUBMIT=0 + export RAY_ADDRESS + export RAY_JOB_ADDRESS + export RAY_NUM_CPUS + export SGLANG_ROUTER_HOST="${RAY_HEAD_IP}" + export SGLANG_ROUTER_BASE_URL="http://${RAY_HEAD_IP}:${SGLANG_ROUTER_PORT}" + + echo "[$(date +'%F %T')] Launching PI SWE-Gym training through run_pi_interactive_apptainer.sh" + bash "${PROJECT_ROOT}/examples/swegym_slime_grpo/run_pi_interactive_apptainer.sh" +else + echo "[$(date +'%F %T')] Waiting for Ray head at ${RAY_ADDRESS} from ${node} (${node_ip})" + python - <<'PY' +import os +import socket +import time + +host, port_s = os.environ["RAY_ADDRESS"].split(":") +port = int(port_s) +for _ in range(120): + try: + with socket.create_connection((host, port), timeout=2): + break + except OSError: + time.sleep(2) +else: + raise SystemExit(f"Ray head did not open at {host}:{port}") +PY + + echo "[$(date +'%F %T')] Starting Ray worker rank ${rank} on ${node} (${node_ip})" + ray start \ + --address="${RAY_ADDRESS}" \ + --node-ip-address="${node_ip}" \ + --num-cpus="${RAY_NUM_CPUS_PER_NODE}" \ + --num-gpus="${GPUS_PER_NODE}" \ + --temp-dir="${RAY_TMPDIR}" \ + --block >"${RUN_LOG_DIR}/ray-worker-${rank}.log" 2>&1 & + ray_pid="$!" + + while [ ! -f "${STOP_FILE}" ]; do + if ! kill -0 "${ray_pid}" 2>/dev/null; then + wait "${ray_pid}" + exit $? + fi + sleep 5 + done + ray stop --force >/dev/null 2>&1 || true + wait "${ray_pid}" 2>/dev/null || true +fi +WORKER +chmod +x "${WORKER_SCRIPT}" + +export PROJECT_ROOT REFERENCE_ROOT SCRIPT_DIR +export MODEL_NAME PI_MODEL_NAME HF_CHECKPOINT REF_LOAD FULL_PROMPT_DATA +export TRAIN_SQSH SLIME_DIR MEGATRON_DIR SHARED_SIF_DIR APPTAINER_IMAGE_DIR AGENT_CLI_DIR HOST_NVIDIA_LIB_DIR +export HF_CACHE_ROOT HF_HOME HUGGINGFACE_HUB_CACHE HF_HUB_CACHE TRANSFORMERS_CACHE HF_DATASETS_CACHE HF_MODULES_CACHE SENTENCE_TRANSFORMERS_HOME +export RUN_SERIES_ID RUN_ID RUN_DIR RUN_LOG_DIR ROLLOUT_SAVE_DIR SAVE_DIR STOP_FILE WORKER_SCRIPT +export SMOKE_NUM_ROWS NUM_EPOCH NUM_ROLLOUT SAVE_INTERVAL START_ROLLOUT_ID +export GPUS_PER_NODE TOTAL_GPUS TRAIN_NUM_GPUS ACTOR_NUM_NODES ACTOR_NUM_GPUS_PER_NODE +export ROLLOUT_NUM_GPUS ROLLOUT_NUM_GPUS_PER_ENGINE TENSOR_MODEL_PARALLEL_SIZE +export ROLLOUT_BATCH_SIZE N_SAMPLES_PER_PROMPT NUM_STEPS_PER_ROLLOUT DISTRIBUTED_TIMEOUT_MINUTES +export MAX_TOKENS_PER_GPU ROLLOUT_MAX_RESPONSE_LEN ROLLOUT_MAX_PROMPT_LEN +export SGLANG_CONTEXT_LENGTH SGLANG_MEM_FRACTION_STATIC SGLANG_LOG_LEVEL +export RAY_HEAD_IP RAY_ADDRESS RAY_JOB_ADDRESS RAY_PORT RAY_DASHBOARD_PORT RAY_NUM_CPUS RAY_NUM_CPUS_PER_NODE +export ROLLOUT_PORT GATEWAY_PORT SGLANG_ROUTER_PORT SLIME_SGLANG_BASE_PORT +export POLAR_BUILDER_STRATEGY POLAR_MIN_COMPLETE_ACCEPT_FRACTION POLAR_MAX_ASYNC_LEVEL +export POLAR_RUNTIME_MEMORY_MB POLAR_TASK_TIMEOUT_SECONDS POLAR_REQUEST_TIMEOUT +export PI_API_TYPE PI_CONTEXT_WINDOW PI_MAX_TOKENS PI_THINKING +export PI_FAIL_ON_CONTEXT_LIMIT PI_COMPACTION_ENABLED PI_RETRY_ENABLED PI_RETRY_MAX_RETRIES PI_PROVIDER_MAX_RETRIES +export TRAIN_LR CLIP_GRAD KL_LOSS_COEF EPS_CLIP EPS_CLIP_HIGH +export SLIME_ZERO_NONFINITE_GRADS PYTORCH_CUDA_ALLOC_CONF PYTORCH_ALLOC_CONF +export RAY_MEMORY_USAGE_THRESHOLD RAY_memory_usage_threshold GPU_MONITOR_INTERVAL +export GPU_WANDB_MONITOR GPU_WANDB_MONITOR_INTERVAL +export USE_WANDB WANDB_MODE WANDB_ENTITY WANDB_PROJECT WANDB_GROUP WANDB_RUN_ID WANDB_RANDOM_SUFFIX WANDB_TEAM WANDB_HOST WANDB_API_KEY WANDB_DIR +export PATCH_SLIME PATCH_SGLANG PATCH_RAY_PY312 FRESH_START RESUME_FROM_SAVE DRY_RUN + +cat <} + Expected: ${EXPECTED_ROLLOUTS} rollout(s) + Start rollout: ${START_ROLLOUT_ID:-} + Auto resume: ${AUTO_RESUME_FROM_SAVE} + Fresh start: ${FRESH_START}, resume_from_save=${RESUME_FROM_SAVE} + Stability: lr=${TRAIN_LR}, clip_grad=${CLIP_GRAD}, kl=${KL_LOSS_COEF}, eps=${EPS_CLIP}/${EPS_CLIP_HIGH}, max_tokens/gpu=${MAX_TOKENS_PER_GPU} + PI budget: sglang_ctx=${SGLANG_CONTEXT_LENGTH}, pi_context=${PI_CONTEXT_WINDOW}, pi_max_tokens=${PI_MAX_TOKENS} + PI fail-fast: context_limit=${PI_FAIL_ON_CONTEXT_LIMIT}, compaction=${PI_COMPACTION_ENABLED}, retry=${PI_RETRY_ENABLED}, provider_retries=${PI_PROVIDER_MAX_RETRIES} + Sandbox mem: ${POLAR_RUNTIME_MEMORY_MB:-} MB per command + Model served: ${MODEL_NAME} + PI model: ${PI_MODEL_NAME} + HF checkpoint: ${HF_CHECKPOINT} + HF cache: ${HF_HOME} + Megatron load: ${REF_LOAD} + Run series: ${RUN_SERIES_ID} + Run ID: ${RUN_ID} + Run dir: ${RUN_DIR} + Save dir: ${SAVE_DIR} + W&B project: ${WANDB_PROJECT} + W&B group: ${WANDB_GROUP} + W&B run id: ${WANDB_RUN_ID} + GPU W&B mon.: ${GPU_WANDB_MONITOR} every ${GPU_WANDB_MONITOR_INTERVAL}s +============================================= +EOF + +srun \ + --overlap \ + --nodes="${NUM_NODES}" \ + --ntasks="${NUM_NODES}" \ + --ntasks-per-node=1 \ + --gres="gpu:${GPUS_PER_NODE}" \ + --container-image="${TRAIN_SQSH}" \ + --container-mounts="${TRAIN_CONTAINER_MOUNTS}" \ + --container-workdir="${PROJECT_ROOT}" \ + --container-writable \ + --no-container-mount-home \ + bash "${WORKER_SCRIPT}" diff --git a/scripts/patch/patch_sglang.sh b/scripts/patch/patch_sglang.sh index 388f19d1c..498f969f9 100755 --- a/scripts/patch/patch_sglang.sh +++ b/scripts/patch/patch_sglang.sh @@ -169,21 +169,40 @@ utils_text = replace_once( utils_path.write_text(utils_text) # --------------------------------------------------------------------------- -# tokenizer_manager.py — expose the already-tokenized prompt via meta_info so -# serving_chat can emit input_token_ids without paying per-prompt-token -# logprob compute. +# tokenizer_manager.py — persist canonical prompt token IDs on ReqState right +# after tokenization, then expose them via meta_info so serving_chat can emit +# input_token_ids without paying per-prompt-token logprob compute. # --------------------------------------------------------------------------- tokenizer_manager_text = tokenizer_manager_path.read_text() tokenizer_manager_text = replace_once( tokenizer_manager_text, - " # Build meta_info and return value\n" - " meta_info = {\n" - " \"id\": rid,\n" - " \"finish_reason\": recv_obj.finished_reasons[i],\n" - " \"prompt_tokens\": recv_obj.prompt_tokens[i],\n" - " \"weight_version\": self.server_args.weight_version,\n" - " \"total_retractions\": recv_obj.retraction_counts[i],\n" - " }\n", + " output_ids: List[int] = dataclasses.field(default_factory=list)\n" + " input_token_logprobs_val: List[float] = dataclasses.field(default_factory=list)\n", + " output_ids: List[int] = dataclasses.field(default_factory=list)\n" + " input_token_ids: List[int] = dataclasses.field(default_factory=list)\n" + " input_token_logprobs_val: List[float] = dataclasses.field(default_factory=list)\n", + label=str(tokenizer_manager_path), +) +tokenizer_manager_text = replace_once( + tokenizer_manager_text, + " tokenized_obj.time_stats = self.rid_to_state[obj.rid].time_stats\n" + " self.rid_to_state[obj.rid].time_stats.set_tokenize_finish_time()\n" + "\n" + " return tokenized_obj\n", + " state = self.rid_to_state[obj.rid]\n" + " tokenized_obj.time_stats = state.time_stats\n" + " state.time_stats.set_tokenize_finish_time()\n" + " if isinstance(input_ids, list) and input_ids:\n" + " if isinstance(input_ids[0], int):\n" + " state.input_token_ids = list(input_ids)\n" + " elif isinstance(input_ids[0], list) and input_ids[0]:\n" + " state.input_token_ids = list(input_ids[0])\n" + "\n" + " return tokenized_obj\n", + label=str(tokenizer_manager_path), +) + +meta_info_base = ( " # Build meta_info and return value\n" " meta_info = {\n" " \"id\": rid,\n" @@ -192,14 +211,38 @@ tokenizer_manager_text = replace_once( " \"weight_version\": self.server_args.weight_version,\n" " \"total_retractions\": recv_obj.retraction_counts[i],\n" " }\n" - " _obj_input_ids = getattr(state.obj, \"input_ids\", None)\n" - " if isinstance(_obj_input_ids, list) and _obj_input_ids:\n" - " if isinstance(_obj_input_ids[0], int):\n" - " meta_info[\"input_token_ids\"] = list(_obj_input_ids)\n" - " elif isinstance(_obj_input_ids[0], list) and _obj_input_ids[0]:\n" - " meta_info[\"input_token_ids\"] = list(_obj_input_ids[0])\n", - label=str(tokenizer_manager_path), ) +meta_info_old_patch = ( + meta_info_base + + " _obj_input_ids = getattr(state.obj, \"input_ids\", None)\n" + + " if isinstance(_obj_input_ids, list) and _obj_input_ids:\n" + + " if isinstance(_obj_input_ids[0], int):\n" + + " meta_info[\"input_token_ids\"] = list(_obj_input_ids)\n" + + " elif isinstance(_obj_input_ids[0], list) and _obj_input_ids[0]:\n" + + " meta_info[\"input_token_ids\"] = list(_obj_input_ids[0])\n" +) +meta_info_new_patch = ( + meta_info_base + + " _state_input_ids = getattr(state, \"input_token_ids\", None)\n" + + " _obj_input_ids = _state_input_ids or getattr(state.obj, \"input_ids\", None)\n" + + " if isinstance(_obj_input_ids, list) and _obj_input_ids:\n" + + " if isinstance(_obj_input_ids[0], int):\n" + + " meta_info[\"input_token_ids\"] = list(_obj_input_ids)\n" + + " elif isinstance(_obj_input_ids[0], list) and _obj_input_ids[0]:\n" + + " meta_info[\"input_token_ids\"] = list(_obj_input_ids[0])\n" +) +if meta_info_new_patch not in tokenizer_manager_text: + if meta_info_old_patch in tokenizer_manager_text: + tokenizer_manager_text = tokenizer_manager_text.replace( + meta_info_old_patch, meta_info_new_patch, 1 + ) + else: + tokenizer_manager_text = replace_once( + tokenizer_manager_text, + meta_info_base, + meta_info_new_patch, + label=str(tokenizer_manager_path), + ) tokenizer_manager_path.write_text(tokenizer_manager_text) # --------------------------------------------------------------------------- diff --git a/src/polar/runtime/apptainer.py b/src/polar/runtime/apptainer.py index a0a52d68d..6bcaf05d1 100644 --- a/src/polar/runtime/apptainer.py +++ b/src/polar/runtime/apptainer.py @@ -26,6 +26,10 @@ def __init__(self, spec: RuntimeSpec, session_id: str, session_dir: Path) -> Non safe_name = session_id.replace("/", "-")[:30] self._instance_name = f"polar-{safe_name}-{short_hash}" self._binary = self._resolve_binary() + self._direct_exec = bool(os.environ.get("POLAR_APPTAINER_DIRECT_EXEC")) or bool( + spec.kwargs.get("direct_exec", False) + ) + self._overlay_dir = self.session_dir / "overlay" @property def runtime_id(self) -> str: @@ -35,6 +39,12 @@ def runtime_id(self) -> str: def supports_gpus(self) -> bool: return True + @property + def supports_memory_limits(self) -> bool: + # Apptainer has no portable memory flag on this cluster, but the + # command shell can enforce RLIMIT_AS for sandbox commands. + return True + @property def can_disable_internet(self) -> bool: return True @@ -44,8 +54,18 @@ async def start(self) -> None: raise RuntimeError("apptainer runtime was already destroyed") # Use a host-backed overlay directory instead of --writable-tmpfs # (default tmpfs overlay is only 64 MB, too small for most workloads). - self._overlay_dir = self.session_dir / "overlay" self._overlay_dir.mkdir(parents=True, exist_ok=True) + if self._direct_exec: + rc, _, stderr = await self._run_local_command( + *self._exec_prefix(), + "true", + capture=True, + ) + if rc != 0: + raise RuntimeError( + f"{self._binary} direct exec failed with exit code {rc}: {stderr}" + ) + return args = [self._binary, "instance", "start", "--overlay", str(self._overlay_dir)] if self.spec.gpus > 0: @@ -76,6 +96,8 @@ async def stop(self) -> None: if self._destroyed: return self._destroyed = True + if self._direct_exec: + return rc, _, stderr = await self._run_local_command( self._binary, "instance", "stop", self._instance_name, timeout=self._STOP_TIMEOUT, capture=True, @@ -99,13 +121,16 @@ async def exec( wrapped_command = command if effective_workdir: wrapped_command = f"cd {shlex.quote(effective_workdir)} && {command}" - shell_exports = [] + shell_prefix = [] + if self.spec.memory_mb is not None: + memory_kb = max(1, int(self.spec.memory_mb)) * 1024 + shell_prefix.append(f"ulimit -v {memory_kb};") for key in ("HOME", "PATH"): if key in effective_env: - shell_exports.append(f"export {key}={shlex.quote(str(effective_env[key]))};") - if shell_exports: - wrapped_command = " ".join(shell_exports + [wrapped_command]) - args = [self._binary, "exec", f"instance://{self._instance_name}"] + shell_prefix.append(f"export {key}={shlex.quote(str(effective_env[key]))};") + if shell_prefix: + wrapped_command = " ".join(shell_prefix + [wrapped_command]) + args = self._exec_prefix() if effective_env: args.append("env") args.extend(f"{key}={value}" for key, value in effective_env.items()) @@ -128,7 +153,7 @@ async def upload_file(self, local_path: str, remote_path: str) -> None: "bash", "-c", f"tar -cf - -C {shlex.quote(source_dir)} {shlex.quote(filename)} | " - f"{self._binary} exec instance://{self._instance_name} " + f"{shlex.join(self._exec_prefix())} " f"tar -xf - -C {shlex.quote(parent)}", capture=False, ) @@ -147,7 +172,7 @@ async def upload_dir(self, local_path: str, remote_path: str) -> None: "bash", "-c", f"tar -cf - -C {shlex.quote(local_path)} . | " - f"{self._binary} exec instance://{self._instance_name} " + f"{shlex.join(self._exec_prefix())} " f"tar -xf - -C {shlex.quote(remote_path)}", capture=False, ) @@ -164,7 +189,7 @@ async def download_file(self, remote_path: str, local_path: str) -> None: rc, _, _ = await self._run_local_command( "bash", "-c", - f"{self._binary} exec instance://{self._instance_name} " + f"{shlex.join(self._exec_prefix())} " f"tar -cf - -C {shlex.quote(parent)} {shlex.quote(filename)} | " f"tar -xf - -C {shlex.quote(local_dir)}", capture=False, @@ -181,7 +206,7 @@ async def download_dir(self, remote_path: str, local_path: str) -> None: rc, _, _ = await self._run_local_command( "bash", "-c", - f"{self._binary} exec instance://{self._instance_name} " + f"{shlex.join(self._exec_prefix())} " f"tar -cf - -C {shlex.quote(remote_path)} . | " f"tar -xf - -C {shlex.quote(local_path)}", capture=False, @@ -191,6 +216,25 @@ async def download_dir(self, remote_path: str, local_path: str) -> None: f"apptainer download_dir failed with exit code {rc}" ) + def _exec_prefix(self) -> list[str]: + if not self._direct_exec: + return [self._binary, "exec", f"instance://{self._instance_name}"] + args = [self._binary, "exec", "--overlay", str(self._overlay_dir)] + if self.spec.gpus > 0: + args.append("--nv") + network_name: str | None + if not self.spec.allow_internet: + network_name = "none" + else: + network_name = self.spec.network + if network_name and network_name != "host": + args.extend(["--net", "--network", network_name]) + args.extend(["--bind", f"{self.session_dir}:{self.runtime_session_dir}"]) + for volume in self.spec.kwargs.get("volumes", []): + args.extend(["--bind", str(volume)]) + args.append(self.spec.image) + return args + @staticmethod def _resolve_binary() -> str: override = os.environ.get("POLAR_APPTAINER_BIN")