From 9b36b171af18b0fbddd3a191e80be65c28fdd8bb Mon Sep 17 00:00:00 2001 From: astro-friedel Date: Tue, 28 Feb 2023 08:54:27 -0600 Subject: [PATCH 1/6] initial work toward integrating Parsl --- .gitignore | 1 + test_drivers/testing.yaml | 9 + test_drivers/time-lazy-flame1d.yaml | 25 ++ timing.py | 419 ++++++++++++++++++++++++++++ 4 files changed, 454 insertions(+) create mode 100644 test_drivers/testing.yaml create mode 100644 test_drivers/time-lazy-flame1d.yaml create mode 100644 timing.py diff --git a/.gitignore b/.gitignore index a4357ec29..c593efddb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *~ .*.sw[op] ./*.png +.idea/ \ No newline at end of file diff --git a/test_drivers/testing.yaml b/test_drivers/testing.yaml new file mode 100644 index 000000000..cfda219d6 --- /dev/null +++ b/test_drivers/testing.yaml @@ -0,0 +1,9 @@ +timing: + sql_path: ./log_data + project: uiuc + queue: pbatch + nnodes: 1 + mirge_branch: production + pyopenclCTX: port:tesla + host: Lassen + gpu_arch: GV100GL diff --git a/test_drivers/time-lazy-flame1d.yaml b/test_drivers/time-lazy-flame1d.yaml new file mode 100644 index 000000000..4851ccba5 --- /dev/null +++ b/test_drivers/time-lazy-flame1d.yaml @@ -0,0 +1,25 @@ +driver: + name: flame1d-lazy + repo: illinois-ceesd/drivers_flame1d + branch: main + exename: &EXE flame1d + params: + nviz: 100 + nrestart: 100 + nhealth: 100 + nstatus: 100 + current_dt: 1.0e-8 + t_final: 2.e-7 + order: 1 + alpha_sc: 0.5 + s0_sc: -5.0 + kappa_sc: 0.5 + integrator: rk4 + health_pres_min: 1700 + health_pres_max: 280000 + timing_env_name: !join [*EXE, .lazy.timing.env] + summary_file_root: !join [*EXE, _lazy] + yaml_file_name: !join [*EXE, -timings.yaml] + logdir: !join [*EXE, _logs] + execopts: --lazy --log + mem_usage: False diff --git a/timing.py b/timing.py new file mode 100644 index 000000000..acae71ff7 --- /dev/null +++ b/timing.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python3 + +import argparse +import os +import shutil +import yaml +from jsonschema import validate, exceptions +import datetime +import platform +import sys +import parsl +import hashlib +from git import Repo +from parsl.executors import HighThroughputExecutor +from parsl.providers import LSFProvider +from parsl.launchers import JsrunLauncher +from parsl.providers import SlurmProvider +from parsl.launchers import SrunLauncher + +from parsl.data_provider.files import File +from parsl.app.app import bash_app +from parsl.config import Config + +def nodeJoin(loader, nodeid): + return ''.join([str(item) for item in loader.construct_sequence(nodeid)]) + +yaml.add_constructor('!join', nodeJoin) + +TIMING_DATE = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") + +print(datetime.datetime.now()) + +class Runner: + ITEMS = ['name', 'repo', 'branch', 'execname', 'params', 'pyopenclCTX', + 'execopts', 'timing_env_name', 'yaml_file_name', 'logdir', + 'mem_usage'] + def __init__(self, ymlFile): + self.timestamp = datetime.datetime.now().strftime("%Y.%m.%d-%H.%M.%S") + self.batch_output_file = None + self.fname = None + self.hash = None + self.md5sum = None + self.executor = None + self.work_dir = None + self.paramsfile = None + self.sqlfile = None + for it in Runner.ITEMS: + setattr(self, it, None) + + if not self.loadYaml(ymlFile): + raise Exception(f"Could not load test runner {ymlFile}") + + def setMD5(self, msum): + self.md5sum = msum + + def verify_runner(self, ymlFile): + self.fname = ymlFile + if os.path.isfile(ymlFile): + pass + elif os.path.isfile(ymlFile + '.yml'): + self.fname = ymlFile + '.yml' + elif os.path.isfile(ymlFile + '.yaml'): + self.fname = ymlFile + '.yaml' + else: + self.fname = None + + def get(self, item): + return getattr(self, item, None) + + def loadYaml(self, ymlFile, vars): + self.verify_runner(ymlFile) + if self.fname: + try: + validate(yaml.load(self.fname), yaml.load("test_schema.yaml")) + except exceptions.ValidationError: + return False + drv = yaml.load(open(self.fname, 'r'), Loader=yaml.FullLoader) + + for it in Runner.ITEMS: + setattr(self, it, drv['driver'][it]) + + self.batch_output_file = f"{self.summary_file_root}_{self.timestamp}.out" + self.sqlfile = os.path.join(vars['sql_path'], self.name) + '-rank0.sqlite' + return True + return False + + def get_driver(self): + cwd = os.getcwd() + os.system(f"rm -rf {self.name}") + + Repo.clone_from(f"https://github.com/{self.repo}", os.path.join(os.getcwd(), self.name), + branch=self.branch) + os.chdir(os.path.join(self.name, "timing_run")) + self.work_dir = os.getcwd() + drepo = Repo(os.getcwd()) + self.hash = drepo.rev_parse("HEAD").hexsha + drepo.close() + os.chdir(cwd) + + def write_params(self): + self.paramsfile = os.path.join(self.work_dir, self.name) + "_timing_params.yaml" + with open(self.paramsfile, 'w') as yfh: + yaml.dump(self.params, yfh) + + def getExecutor(self, emirge, vars): + if vars['host'].lower() == 'lassen': + self.executor = HighThroughputExecutor(label=self.name, + working_dir=self.work_dir, + address='lassen.llnl.gov', + worker_port_range=(5000, 55000), + worker_debug=True, + provider=LSFProvider(launcher=JsrunLauncher(overrides=''), + walltime='01:00:00', + nodes_per_block=vars['nnodes'], + init_blocks=vars['nnodes'], + max_blocks=vars['nnodes'], + bsub_redirection=True, + scheduler_options=f"#BSUB -q {vars['queue']}", + worker_init=("module load spectrum-mpi\n" + f"source {os.path.join(emirge, 'config', 'activate_env.sh')}\n" + f"export PYOPENCL_CTX=\"{vars['pyopenclCTX']}\"\n" + f"export XDG_CACHE_HOME=\"{os.path.join(os.sep, 'tmp', '$USER', 'xdg-scratch', self.name)}\"\n" + "rm -rf \$XDG_CACHE_HOME\n" + "rm -f timing-run-done\n" + "which python\n" + "conda env list\n" + "env\n" + "env | grep LSB_MCPU_HOSTS\n" + ), + project=vars['project'], + cmd_timeout=600 + ) + ) + + elif vars['host'].lower() == 'quartz': + # not completely configured yet + ''' + self.executor = HighThroughputExecutor(label=self.name, + working_dir='', + address='quartz.llnl.gov', + worker_port_range=(50000, 55000), + worker_debug=True, + provider=SlurmProvider(launcher=SrunLauncher(overrides=f''), + walltime='02:00:00', + nodes_per_block=1, + init_blocks=1, + max_blocks=1, + scheduler_options='#SBATCH -q pdebug', + worker_init=("module load spectrum-mpi\n" + f"export XDG_CACHE_HOME=/tmp/$USER/xdg-scratch_{self.name}\n" + "conda activate ...\n" + ), + cmd_timeout=600 + ) + )''' + raise Exception("Executor for quartz not implemented yet.") + else: + raise Exception(f"Could not create Executor for {vars['host']}") + + +def run(cls): + return f"python -O -u -m mpi4py {os.path.join(cls.work_dir, cls.name + '.py')} -i {cls.paramsfile} {cls.execopts}" + +def process(cls, var): + import subprocess + import socket + import datetime + import platform + + uname = platform.uname() + timing_platform = uname.system + timing_arch = uname.machine + + if not os.path.isfile(cls.sqlfile): + raise Exception(f"Timing run did not produce the expected sqlite file: {cls.sqlfile}") + if os.path.isfile(cls.yaml_file_name): + os.remove(cls.yaml_file_name) + summary_file_name = os.path.join(var['sql_path'], cls.summary_file_root + '_' + cls.timestamp) + '.sqlite' + if os.path.isfile(summary_file_name): + os.remove(summary_file_name) + # --- Pull the timings out of the sqlite files generated by logging + + rgather = subprocess.Popen(f"runalyzer-gather {summary_file_name} {self.sqlfile}", + stdout=subprocess.PIPE, shell=True, text=True) + outs, errs = rgather.communicate() + cld = subprocess.Popen(f"$(sqlite3 {summary_file_name} 'SELECT cl_device_name FROM runs')", + stdout=subprocess.PIPE, shell=True, text=True) + cl_device, errs = cld.communicate() + + stup = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(q(\"select $t_init.max\").fetchall()[0][0])' | grep -v INFO)", + stdout=subprocess.PIPE, shell=True, text=True) + startup_time, errs = stup.communicate() + + fst = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[0:1]))' | grep -v INFO))", + stdout=subprocess.PIPE, shell=True, text=True) + first_step, errs = fst.communicate() + + ften = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[0:10]))' | grep -v INFO)", + stdout=subprocess.PIPE, shell=True, text=True) + first_10_steps, errs = ften.communicate() + + sten = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[10:19]))' | grep -v INFO)", + stdout=subprocess.PIPE, shell=True, text=True) + second_10_steps = sten.communicate() + + output = {'run_date': TIMING_DATE, + 'run_host': socket.gethostname(), + 'cl_device': cl_device, + 'run_epoch': datetime.datetime.now().timestamp(), + 'run_platform': timing_platform, + 'run_arch': timing_arch, + 'gpu_arch': vars['gpu_arch'], + 'mirge_version': vars['mirge_hash'], + } + + with open(cls.yaml_file_name, 'w') as fh: + # --- Create a YAML-compatible text snippet with the timing info + printf "\n\n" > ${YAML_FILE_NAME} + printf "\n" >> ${YAML_FILE_NAME} + printf \n\n" >> ${YAML_FILE_NAME} + printf "\n\n" >> ${YAML_FILE_NAME} + printf "mirge_version: ${MIRGE_HASH}\ny1_version: ${Y1_HASH}\n" >> ${YAML_FILE_NAME} + printf "driver_version: ${DRIVER_HASH}\ndriver_md5sum: ${DRIVER_MD5SUM}\n" >> ${YAML_FILE_NAME} + printf "time_startup: ${STARTUP_TIME}\ntime_first_step: ${FIRST_STEP}\n" >> ${YAML_FILE_NAME} + printf "time_first_10: ${FIRST_10_STEPS}\ntime_second_10: ${SECOND_10_STEPS}\n---\n" >> ${YAML_FILE_NAME} + + + if [ ! -z "${TESTING_SSH_KEY}" ]; then + eval $(ssh-agent) + trap "kill $SSH_AGENT_PID" EXIT + ssh-add ${TESTING_SSH_KEY} + fi + + # --- Update the timing data in the repo + # ---- First, clone the timing repo + git clone -b ${TIMING_BRANCH} git@github.com:${TIMING_REPO} + # ---- Create the timing file if it does not exist + if [[ ! -f timing/${YAML_FILE_NAME} ]]; then + touch timing/${YAML_FILE_NAME} + (cd timing && git add ${YAML_FILE_NAME}) + fi + # ---- Update the timing file with the current test data + cat ${YAML_FILE_NAME} >> timing/${YAML_FILE_NAME} + mkdir -p timing/${LOGDIR} + cp ${SUMMARY_FILE_NAME} timing/${LOGDIR} + cat *.out > timing/${LOGDIR}/${BATCH_OUTPUT_FILE} + cd timing + git add ${LOGDIR}/ + # ---- Commit the new data to the repo + (git commit -am "Automatic commit: ${TIMING_HOST} ${TIMING_DATE}" && git push) + cd ../ + + + +def setup_env(args): + cwd = os.getcwd() + # -- Install conda env, dependencies and MIRGE-Com via *emirge* + # --- remove old run if it exists + if os.path.isdir("emirge"): + shutil.move("emirge", "emirge.old") + os.system("rm -rf emirge.old &") + + # --- grab emirge and install MIRGE-Com + Repo.clone_from("https://github.com/illinois-ceesd/emirge.git", + os.path.join(cwd, "emirge")) + os.chdir("emirge") + os.system(f"./install.sh --branch=${args.mirge_branch} --env-name=timing_tests") + os.chdir("mirgecom") + mrepo = Repo(os.getcwd()) + # -- Grab and merge the branch with the case-dependent features + mhash = mrepo.rev_parse(f"origin/{args.mirge_branch}").hexsha + mrepo.close() + os.chdir(cwd) + return mhash + + +TIMING_REPO = "illinois-ceesd/timing.git" +TIMING_BRANCH = "main" + + +def parse_args(): + parser = argparse.ArgumentParser(description="Mirgecom timing tests") + parser.add_argument("-r", "--run", type=ascii, dest="runners", + action="store", nargs="?", + help="The tests to run (must match yaml file name)") + parser.add_argument("-m", "--build_mirgecom", actio='store_true', + dest="buld_mirecom", help="Whether to build mirgecom") + parser.add_argument("-b", "--mirge_branch", type=ascii, action='store', dest="mirge_branch", + default="production", help="The branch of emirge to build, Default is production") + parser.add_argument("-y", "--yml", action='store', default='testing.yaml') + parser.add_argument() + return parser.parse_args() + +if __name__ == '__main__': + timing_home = os.getcwd() + emirge_home = os.path.join(timing_home, 'emirge') + + cmdargs = parse_args() + master_vars = yaml.load(cmdargs.yml, Loader=yaml.FullLoader) + + runners = [] + for yml in cmdargs.runners: + runners.append(Runner(yml)) + master_vars['mirge_hash'] = setup_env(cmdargs) + executors = [] + jobs = [] + for runner in runners: + + runner.get_driver() + # --- Get an MD5Sum for the untracked timing driver + + md5_hash = hashlib.md5() + with open(runner.get('execname') + '.py', 'rb') as fh: + for block in iter(lambda: fh.read(4096), b""): + md5_hash.update(block) + runner.setMD5(md5_hash.hexdigest()) + runner.getExecutor(emirge_home, master_vars) + runner.write_params() + executors.append(runner.executor) + parsl.set_stream_logger() + parsl.clear() + parsl.load(Config(executors=executors)) + for runner in runners: + jobs.append((bash_app(executors=[runner.name]))(runner.run)()) + + + + + +timestamp=$(date "+%Y.%m.%d-%H.%M.%S") + +TIMING_DATE=$(date "+%Y-%m-%d %H:%M") +TIME_SINCE_EPOCH=$(date +%s) + + + + +# -- Run the case (platform-dependent) +printf "Running on Host: ${TIMING_HOST}\n" + + +date +# -- Process the results of the timing run +RUN_LOG_FILE="${SQL_PATH}/${exename}-rank0.sqlite" +if [[ -f "${RUN_LOG_FILE}" ]]; then + + rm -f ${YAML_FILE_NAME} + SUMMARY_FILE_NAME="${SQL_PATH}/${SUMMARY_FILE_ROOT}_${timestamp}.sqlite" + rm -f ${SUMMARY_FILE_NAME} + + # --- Pull the timings out of the sqlite files generated by logging + runalyzer-gather ${SUMMARY_FILE_NAME} ${RUN_LOG_FILE} + CL_DEVICE=$(sqlite3 ${SUMMARY_FILE_NAME} 'SELECT cl_device_name FROM runs') + STARTUP_TIME=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(q("select $t_init.max").fetchall()[0][0])' | grep -v INFO) + FIRST_STEP=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(sum(p[0] for p in q("select $t_step.max").fetchall()[0:1]))' | grep -v INFO) + FIRST_10_STEPS=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(sum(p[0] for p in q("select $t_step.max").fetchall()[0:10]))' | grep -v INFO) + SECOND_10_STEPS=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(sum(p[0] for p in q("select $t_step.max").fetchall()[10:19]))' | grep -v INFO) + MAX_PYTHON_MEM_USAGE=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(max(p[0] for p in q("select $memory_usage_python.max").fetchall()))' | grep -v INFO) + MAX_GPU_MEM_USAGE=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(max(p[0] for p in q("select $memory_usage_gpu.max").fetchall()))' | grep -v INFO) + + # --- Create a YAML-compatible text snippet with the timing info + printf "run_date: ${TIMING_DATE}\nrun_host: ${TIMING_HOST}\n" > ${YAML_FILE_NAME} + printf "cl_device: ${CL_DEVICE}\n" >> ${YAML_FILE_NAME} + printf "run_epoch: ${TIME_SINCE_EPOCH}\nrun_platform: ${TIMING_PLATFORM}\n" >> ${YAML_FILE_NAME} + printf "run_arch: ${TIMING_ARCH}\ngpu_arch: ${GPU_ARCH}\n" >> ${YAML_FILE_NAME} + printf "mirge_version: ${MIRGE_HASH}\ny1_version: ${Y1_HASH}\n" >> ${YAML_FILE_NAME} + printf "driver_version: ${DRIVER_HASH}\ndriver_md5sum: ${DRIVER_MD5SUM}\n" >> ${YAML_FILE_NAME} + printf "time_startup: ${STARTUP_TIME}\ntime_first_step: ${FIRST_STEP}\n" >> ${YAML_FILE_NAME} + printf "time_first_10: ${FIRST_10_STEPS}\ntime_second_10: ${SECOND_10_STEPS}\n" >> ${YAML_FILE_NAME} + printf "max_python_mem_usage: ${MAX_PYTHON_MEM_USAGE}\n" >> ${YAML_FILE_NAME} + printf "max_gpu_mem_usage: ${MAX_GPU_MEM_USAGE}\n---\n" >> ${YAML_FILE_NAME} + + # Users should set special keys for using git over + # ssh for security concerns. This snippet will use + # a pre-arranged ssh key if the user provides one + # and indicates it with the TESTING_SSH_KEY environment + # variable. + # ===== To create a key: + # - Run ssh-keygen: + # $ ssh-keygen + # [enter a when prompted] + # - Put the key(s) in a /secure/filesystem/location: + # $ mv * /secure/filesystem/location + # - Add the key to GIT: + # $ [browse to] https://github.com/illinois-ceesd/timing/settings/keys/new + # $ Choose (New SSH key) + # $ Paste in the contents of /secure/filesystem/location/.pub + # - Set the ENV variable before using this script: + # $ export TESTING_SSH_KEY=/secure/filesystem/location/ + if [ ! -z "${TESTING_SSH_KEY}" ]; then + eval $(ssh-agent) + trap "kill $SSH_AGENT_PID" EXIT + ssh-add ${TESTING_SSH_KEY} + fi + + # --- Update the timing data in the repo + # ---- First, clone the timing repo + git clone -b ${TIMING_BRANCH} git@github.com:${TIMING_REPO} + # ---- Create the timing file if it does not exist + if [[ ! -f timing/${YAML_FILE_NAME} ]]; then + touch timing/${YAML_FILE_NAME} + (cd timing && git add ${YAML_FILE_NAME}) + fi + # ---- Update the timing file with the current test data + cat ${YAML_FILE_NAME} >> timing/${YAML_FILE_NAME} + mkdir -p timing/${LOGDIR} + cp ${SUMMARY_FILE_NAME} timing/${LOGDIR} + cat *.out > timing/${LOGDIR}/${BATCH_OUTPUT_FILE} + cd timing + git add ${LOGDIR}/ + # ---- Commit the new data to the repo + (git commit -am "Automatic commit: ${TIMING_HOST} ${TIMING_DATE}" && git push) + cd ../ +else + printf "Timing run did not produce the expected sqlite file: ${RUN_LOG_FILE}\n" + exit 1 +fi + +date From 59a6c7edd50b97d873b3b3b50bbfff3df766ebc3 Mon Sep 17 00:00:00 2001 From: astro-friedel Date: Tue, 28 Feb 2023 13:30:52 -0600 Subject: [PATCH 2/6] fully pythonized parsl script for testing (not expected to work yet) --- test_drivers/testing.yaml | 2 + timing.py | 185 +++++++++++++------------------------- 2 files changed, 66 insertions(+), 121 deletions(-) diff --git a/test_drivers/testing.yaml b/test_drivers/testing.yaml index cfda219d6..c74aeae5d 100644 --- a/test_drivers/testing.yaml +++ b/test_drivers/testing.yaml @@ -7,3 +7,5 @@ timing: pyopenclCTX: port:tesla host: Lassen gpu_arch: GV100GL + timing_repo: illinois-ceesd/timing.git + timing_branch: main diff --git a/timing.py b/timing.py index acae71ff7..9c2ab9b65 100644 --- a/timing.py +++ b/timing.py @@ -4,12 +4,14 @@ import os import shutil import yaml +import socket from jsonschema import validate, exceptions import datetime import platform import sys import parsl import hashlib +from pathlib import Path from git import Repo from parsl.executors import HighThroughputExecutor from parsl.providers import LSFProvider @@ -18,7 +20,7 @@ from parsl.launchers import SrunLauncher from parsl.data_provider.files import File -from parsl.app.app import bash_app +from parsl.app.app import bash_app, python_app from parsl.config import Config def nodeJoin(loader, nodeid): @@ -44,6 +46,7 @@ def __init__(self, ymlFile): self.work_dir = None self.paramsfile = None self.sqlfile = None + self.mem_usage = False for it in Runner.ITEMS: setattr(self, it, None) @@ -158,10 +161,11 @@ def getExecutor(self, emirge, vars): raise Exception(f"Could not create Executor for {vars['host']}") -def run(cls): +def run(cls, outputs=[]): + outputs.append(File(cls.sqlfile)) return f"python -O -u -m mpi4py {os.path.join(cls.work_dir, cls.name + '.py')} -i {cls.paramsfile} {cls.execopts}" -def process(cls, var): +def process(cls, var, inputs=[]): import subprocess import socket import datetime @@ -171,8 +175,8 @@ def process(cls, var): timing_platform = uname.system timing_arch = uname.machine - if not os.path.isfile(cls.sqlfile): - raise Exception(f"Timing run did not produce the expected sqlite file: {cls.sqlfile}") + if not os.path.isfile(inputs[0]): + return False if os.path.isfile(cls.yaml_file_name): os.remove(cls.yaml_file_name) summary_file_name = os.path.join(var['sql_path'], cls.summary_file_root + '_' + cls.timestamp) + '.sqlite' @@ -180,7 +184,7 @@ def process(cls, var): os.remove(summary_file_name) # --- Pull the timings out of the sqlite files generated by logging - rgather = subprocess.Popen(f"runalyzer-gather {summary_file_name} {self.sqlfile}", + rgather = subprocess.Popen(f"runalyzer-gather {summary_file_name} {inputs[0]////////}", stdout=subprocess.PIPE, shell=True, text=True) outs, errs = rgather.communicate() cld = subprocess.Popen(f"$(sqlite3 {summary_file_name} 'SELECT cl_device_name FROM runs')", @@ -211,47 +215,30 @@ def process(cls, var): 'run_arch': timing_arch, 'gpu_arch': vars['gpu_arch'], 'mirge_version': vars['mirge_hash'], - } - - with open(cls.yaml_file_name, 'w') as fh: - # --- Create a YAML-compatible text snippet with the timing info - printf "\n\n" > ${YAML_FILE_NAME} - printf "\n" >> ${YAML_FILE_NAME} - printf \n\n" >> ${YAML_FILE_NAME} - printf "\n\n" >> ${YAML_FILE_NAME} - printf "mirge_version: ${MIRGE_HASH}\ny1_version: ${Y1_HASH}\n" >> ${YAML_FILE_NAME} - printf "driver_version: ${DRIVER_HASH}\ndriver_md5sum: ${DRIVER_MD5SUM}\n" >> ${YAML_FILE_NAME} - printf "time_startup: ${STARTUP_TIME}\ntime_first_step: ${FIRST_STEP}\n" >> ${YAML_FILE_NAME} - printf "time_first_10: ${FIRST_10_STEPS}\ntime_second_10: ${SECOND_10_STEPS}\n---\n" >> ${YAML_FILE_NAME} - - - if [ ! -z "${TESTING_SSH_KEY}" ]; then - eval $(ssh-agent) - trap "kill $SSH_AGENT_PID" EXIT - ssh-add ${TESTING_SSH_KEY} - fi + 'y1_version': vars['mirge_hash'], + 'driver_version': cls.hash, + 'driver_md5sum': cls.md5sum, + 'time_startup': startup_time, + 'time_first_step': first_step, + 'time_first_10': first_10_steps, + 'time_second_10': second_10_steps} - # --- Update the timing data in the repo - # ---- First, clone the timing repo - git clone -b ${TIMING_BRANCH} git@github.com:${TIMING_REPO} - # ---- Create the timing file if it does not exist - if [[ ! -f timing/${YAML_FILE_NAME} ]]; then - touch timing/${YAML_FILE_NAME} - (cd timing && git add ${YAML_FILE_NAME}) - fi - # ---- Update the timing file with the current test data - cat ${YAML_FILE_NAME} >> timing/${YAML_FILE_NAME} - mkdir -p timing/${LOGDIR} - cp ${SUMMARY_FILE_NAME} timing/${LOGDIR} - cat *.out > timing/${LOGDIR}/${BATCH_OUTPUT_FILE} - cd timing - git add ${LOGDIR}/ - # ---- Commit the new data to the repo - (git commit -am "Automatic commit: ${TIMING_HOST} ${TIMING_DATE}" && git push) - cd ../ + if cls.mem_usage: + mpmu = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(max(p[0] for p in q(\"select $memory_usage_python.max\").fetchall()))' | grep -v INFO)", + stdout=subprocess.PIPE, shell=True, text=True) + max_python_mem_usage, err = mpmu.communicate() + + mgmu = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(max(p[0] for p in q(\"select $memory_usage_gpu.max\").fetchall()))' | grep -v INFO)", + stdout=subprocess.PIPE, shell=True, text=True) + max_gpu_mem_usage, err = mgmu.communicate() + output['max_python_mem_usage'] = max_python_mem_usage + output['max_gpu_mem_usage'] = max_gpu_mem_usage + yaml.dump(output, open(cls.yaml_file_name, 'w')) + return True + def setup_env(args): cwd = os.getcwd() # -- Install conda env, dependencies and MIRGE-Com via *emirge* @@ -274,10 +261,6 @@ def setup_env(args): return mhash -TIMING_REPO = "illinois-ceesd/timing.git" -TIMING_BRANCH = "main" - - def parse_args(): parser = argparse.ArgumentParser(description="Mirgecom timing tests") parser.add_argument("-r", "--run", type=ascii, dest="runners", @@ -303,7 +286,8 @@ def parse_args(): runners.append(Runner(yml)) master_vars['mirge_hash'] = setup_env(cmdargs) executors = [] - jobs = [] + run_jobs = [] + process_jobs = [] for runner in runners: runner.get_driver() @@ -321,55 +305,12 @@ def parse_args(): parsl.clear() parsl.load(Config(executors=executors)) for runner in runners: - jobs.append((bash_app(executors=[runner.name]))(runner.run)()) - - - - - -timestamp=$(date "+%Y.%m.%d-%H.%M.%S") - -TIMING_DATE=$(date "+%Y-%m-%d %H:%M") -TIME_SINCE_EPOCH=$(date +%s) - - - - -# -- Run the case (platform-dependent) -printf "Running on Host: ${TIMING_HOST}\n" - - -date -# -- Process the results of the timing run -RUN_LOG_FILE="${SQL_PATH}/${exename}-rank0.sqlite" -if [[ -f "${RUN_LOG_FILE}" ]]; then - - rm -f ${YAML_FILE_NAME} - SUMMARY_FILE_NAME="${SQL_PATH}/${SUMMARY_FILE_ROOT}_${timestamp}.sqlite" - rm -f ${SUMMARY_FILE_NAME} - - # --- Pull the timings out of the sqlite files generated by logging - runalyzer-gather ${SUMMARY_FILE_NAME} ${RUN_LOG_FILE} - CL_DEVICE=$(sqlite3 ${SUMMARY_FILE_NAME} 'SELECT cl_device_name FROM runs') - STARTUP_TIME=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(q("select $t_init.max").fetchall()[0][0])' | grep -v INFO) - FIRST_STEP=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(sum(p[0] for p in q("select $t_step.max").fetchall()[0:1]))' | grep -v INFO) - FIRST_10_STEPS=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(sum(p[0] for p in q("select $t_step.max").fetchall()[0:10]))' | grep -v INFO) - SECOND_10_STEPS=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(sum(p[0] for p in q("select $t_step.max").fetchall()[10:19]))' | grep -v INFO) - MAX_PYTHON_MEM_USAGE=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(max(p[0] for p in q("select $memory_usage_python.max").fetchall()))' | grep -v INFO) - MAX_GPU_MEM_USAGE=$(runalyzer -m ${SUMMARY_FILE_NAME} -c 'print(max(p[0] for p in q("select $memory_usage_gpu.max").fetchall()))' | grep -v INFO) - - # --- Create a YAML-compatible text snippet with the timing info - printf "run_date: ${TIMING_DATE}\nrun_host: ${TIMING_HOST}\n" > ${YAML_FILE_NAME} - printf "cl_device: ${CL_DEVICE}\n" >> ${YAML_FILE_NAME} - printf "run_epoch: ${TIME_SINCE_EPOCH}\nrun_platform: ${TIMING_PLATFORM}\n" >> ${YAML_FILE_NAME} - printf "run_arch: ${TIMING_ARCH}\ngpu_arch: ${GPU_ARCH}\n" >> ${YAML_FILE_NAME} - printf "mirge_version: ${MIRGE_HASH}\ny1_version: ${Y1_HASH}\n" >> ${YAML_FILE_NAME} - printf "driver_version: ${DRIVER_HASH}\ndriver_md5sum: ${DRIVER_MD5SUM}\n" >> ${YAML_FILE_NAME} - printf "time_startup: ${STARTUP_TIME}\ntime_first_step: ${FIRST_STEP}\n" >> ${YAML_FILE_NAME} - printf "time_first_10: ${FIRST_10_STEPS}\ntime_second_10: ${SECOND_10_STEPS}\n" >> ${YAML_FILE_NAME} - printf "max_python_mem_usage: ${MAX_PYTHON_MEM_USAGE}\n" >> ${YAML_FILE_NAME} - printf "max_gpu_mem_usage: ${MAX_GPU_MEM_USAGE}\n---\n" >> ${YAML_FILE_NAME} + crunner = (bash_app(executors=[runner.name]))(run)(runner) + run_jobs.append(crunner) + process_jobs.append((python_app(executors=[runner.name])))(run)(runner, master_vars, inputs=crunner.outputs[0]) + # -- Run the case (platform-dependent) + #printf "Running on Host: ${TIMING_HOST}\n" # Users should set special keys for using git over # ssh for security concerns. This snippet will use # a pre-arranged ssh key if the user provides one @@ -387,33 +328,35 @@ def parse_args(): # $ Paste in the contents of /secure/filesystem/location/.pub # - Set the ENV variable before using this script: # $ export TESTING_SSH_KEY=/secure/filesystem/location/ - if [ ! -z "${TESTING_SSH_KEY}" ]; then - eval $(ssh-agent) - trap "kill $SSH_AGENT_PID" EXIT - ssh-add ${TESTING_SSH_KEY} - fi + if 'TESTING_SSH_KEY' in os.environ: + ssh_job = subprocess.Popen("eval $(ssh-agent); trap \"kill $SSH_AGENT_PID\" EXIT; ssh-add ${TESTING_SSH_KEY}", + stdout=subprocess.PIPE, shell=True, text=True) # --- Update the timing data in the repo # ---- First, clone the timing repo - git clone -b ${TIMING_BRANCH} git@github.com:${TIMING_REPO} + Repo.clone_from(f"git@github.com:{master_vars['timing_repo']}", os.path.join(os.getcwd(), 'timing'), + branch=master_vars['timing_branch']) + timing_repo = Repo(os.path.join(os.getcwd(), 'timing')) # ---- Create the timing file if it does not exist - if [[ ! -f timing/${YAML_FILE_NAME} ]]; then - touch timing/${YAML_FILE_NAME} - (cd timing && git add ${YAML_FILE_NAME}) - fi - # ---- Update the timing file with the current test data - cat ${YAML_FILE_NAME} >> timing/${YAML_FILE_NAME} - mkdir -p timing/${LOGDIR} - cp ${SUMMARY_FILE_NAME} timing/${LOGDIR} - cat *.out > timing/${LOGDIR}/${BATCH_OUTPUT_FILE} - cd timing - git add ${LOGDIR}/ + count = 0 + for runner, i in enumerate(runners): + if not process_job.result(): + print(f"Timing run did not produce the expected sqlite file: {runner.sqlfile}\n") + continue + count += 1 + if not os.path.exists(os.path.join("timing", runner.yaml_file_name)) + Path(os.path.join(os.getcwd(), 'timing', runner.yaml_file_name)).touch() + timing_repo.index.add([os.path.join("timing", runner.yaml_file_name)]) + + # ---- Update the timing file with the current test data + + os.system(f"cat {runner.yaml_file_name} >> timing/{runner.yaml_file_name}") + os.makedirs(os.path.join('timin', runner.logdir), exist_ok=True) + summary_file_name = os.path.join(master_vars['sql_path'], runner.summary_file_root + '_' + runner.timestamp) + '.sqlite' + shutil.copy(summary_file_name, os.path.join("timing", runner.logdir)) + os.system(f"cat *.out > timing/{runner.logdir}/{runner.batch_output_file}") + timing_repo.index.add([os.path.join('timing', runner.logdir]) # ---- Commit the new data to the repo - (git commit -am "Automatic commit: ${TIMING_HOST} ${TIMING_DATE}" && git push) - cd ../ -else - printf "Timing run did not produce the expected sqlite file: ${RUN_LOG_FILE}\n" - exit 1 -fi - -date + if count > 0 + commit = timing_repo.indexs.commit(f"Automatic commit: {socket.gethostname()} {TIMING_DATE}") + timing_repo.remotes.origin.push() From cf147330c42c3424bc1b23080c5a209ee72bee34 Mon Sep 17 00:00:00 2001 From: astro-friedel Date: Tue, 14 Mar 2023 15:39:33 -0500 Subject: [PATCH 3/6] yaml files for the other drivers --- test_drivers/time-lazy-isolator.yaml | 25 +++++++++++++++++++++++++ test_drivers/time-lazy-nozzle.yaml | 21 +++++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 test_drivers/time-lazy-isolator.yaml create mode 100644 test_drivers/time-lazy-nozzle.yaml diff --git a/test_drivers/time-lazy-isolator.yaml b/test_drivers/time-lazy-isolator.yaml new file mode 100644 index 000000000..1c5b0c332 --- /dev/null +++ b/test_drivers/time-lazy-isolator.yaml @@ -0,0 +1,25 @@ +driver: + name: y2-isolator + repo: illinois-ceesd/drivers_y2-isolator + branch: main + exename: &EXE isolator + params: + nviz: 100 + nrestart: 100 + nhealth: 100 + nstatus: 100 + current_dt: 1.0e-8 + t_final: 2.e-7 + order: 1 + alpha_sc: 0.5 + s0_sc: -5.0 + kappa_sc: 0.5 + integrator: rk4 + health_pres_min: 1700 + health_pres_max: 280000 + timing_env_name: !join [ *EXE, .lazy.timing.env ] + summary_file_root: !join [ *EXE, _lazy ] + yaml_file_name: !join [ *EXE, -timings.yaml ] + logdir: !join [ *EXE, _logs ] + execopts: --lazy --log + mem_usage: True diff --git a/test_drivers/time-lazy-nozzle.yaml b/test_drivers/time-lazy-nozzle.yaml new file mode 100644 index 000000000..3701ecceb --- /dev/null +++ b/test_drivers/time-lazy-nozzle.yaml @@ -0,0 +1,21 @@ +driver: + name: y1-production-nozzle-lazy + repo: illinois-ceesd/drivers_y1-nozzle + branch: main + exename: &EXE nozzle + params: + nviz: 100 + nrestart: 100 + current_dt: 5e-8 + t_final: 1.e-6 + order: 1 + alpha_sc: 0.5 + s0_sc: -5.0 + kappa_sc: 0.5 + logDependent: 0 + timing_env_name: !join [*EXE, .lazy.timing.env] + summary_file_root: !join [ *EXE, _lazy ] + yaml_file_name: !join [ *EXE, -lazy-timings.yaml] + logdir: !join [ *EXE, _lazy_logs ] + execopts: --lazy --log + mem_usage: True From 1e2e031afa7b33b85a23966288a259871944e449 Mon Sep 17 00:00:00 2001 From: astro-friedel Date: Tue, 14 Mar 2023 15:39:58 -0500 Subject: [PATCH 4/6] update to the code, added schema checking and some bug fixes --- timing.py | 275 +++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 221 insertions(+), 54 deletions(-) diff --git a/timing.py b/timing.py index 9c2ab9b65..2fba13fe7 100644 --- a/timing.py +++ b/timing.py @@ -4,39 +4,137 @@ import os import shutil import yaml -import socket from jsonschema import validate, exceptions +import subprocess import datetime -import platform -import sys import parsl import hashlib +import socket from pathlib import Path from git import Repo from parsl.executors import HighThroughputExecutor from parsl.providers import LSFProvider from parsl.launchers import JsrunLauncher -from parsl.providers import SlurmProvider -from parsl.launchers import SrunLauncher +# quartz imports +# from parsl.providers import SlurmProvider +# from parsl.launchers import SrunLauncher -from parsl.data_provider.files import File from parsl.app.app import bash_app, python_app from parsl.config import Config + def nodeJoin(loader, nodeid): + """ Helper function to handle the !join keyword in yaml documents + + Parameters + ---------- + loader: yaml.Loader instance + nodeid: the name of the node to operate on + + Example + ------- + yaml document as follows: + + exec: &EXEC isolator + log: !join [ *EXEC, .log ] + + will produce + { + "exec": "isolator", + "log": "isolator.log" + } + + when loaded + """ return ''.join([str(item) for item in loader.construct_sequence(nodeid)]) + +# add the nodeJoin function to the yal Loader yaml.add_constructor('!join', nodeJoin) TIMING_DATE = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") print(datetime.datetime.now()) -class Runner: +# schema for the main testing yaml +TIMING_SCHEMA = {"title": "Timing Global Schema", + "description": "Schema for main timing test process", + "type": "object", + "properties": { + "timing": { + "$ref": "#/definitions/Timing" + } + }, + "required": ["timing"], + "definitions": { + "Timing": { + "type": "object", + "properties": { + "sql_path": {"type": "string"}, + "project": {"type": "string"}, + "queue": {"type": "string"}, + "nnodes": {"type": "number", "minimum": 1}, + "mirge_branch": {"type": "string"}, + "pyopenclCTX": {"type": "string"}, + "host": {"type": "string"}, + "gpu_arch": {"type": "string"}, + "timing_repo": {"type": "string"}, + "timing_branch": {"type": "string"} + }, + "required": ["sql_path", "project", "queue", "nnodes", "mirge_branch", "pyopenclCTX", + "host", "gpu_arch", "timing_repo", "timing_branch"] + } + } + } + +# schema for the driver yamls +DRIVER_SCHEMA = {"title": "Driver Schema", + "description": "Schema for testing driver", + "type": "object", + "properties": { + "driver": { + "$ref": "#/definitions/Driver" + } + }, + "required": ["driver"], + "definitions": { + "Driver": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "repo": {"type": "string"}, + "branch": {"type": "string"}, + "exename": {"type": "string"}, + "params": {"type": "object"}, + "timing_env_name": {"type": "string"}, + "summary_file_root": {"type": "string"}, + "yaml_file_name": {"type": "string"}, + "logdir": {"type": "string"}, + "execopts": {"type": "string"}, + "mem_usage": {"type": "boolean"} + }, + "required": ["name", "repo", "branch", "exename", "params", "timing_env_name", + "summary_file_root", "yaml_file_name", "logdir", "execopts"] + } + } + } + + +class Driver: + """ Class to define a test instance + + Parameters + ---------- + ymlFile: str, the name of the yaml file giving the parameters for the run + lazy: bool, whether the test is a lazy test + root_dir: str, the root directory for the tests + test_vars: dict, Listing of the overall testing variables + """ ITEMS = ['name', 'repo', 'branch', 'execname', 'params', 'pyopenclCTX', 'execopts', 'timing_env_name', 'yaml_file_name', 'logdir', 'mem_usage'] - def __init__(self, ymlFile): + + def __init__(self, ymlFile, lazy, root_dir, test_vars): self.timestamp = datetime.datetime.now().strftime("%Y.%m.%d-%H.%M.%S") self.batch_output_file = None self.fname = None @@ -47,47 +145,71 @@ def __init__(self, ymlFile): self.paramsfile = None self.sqlfile = None self.mem_usage = False - for it in Runner.ITEMS: + self.root_dir = root_dir + self.yamlFile = "time-" + if lazy: + self.yamlFile += "lazy-" + self.yamlFile += ymlFile + for it in Driver.ITEMS: setattr(self, it, None) - if not self.loadYaml(ymlFile): - raise Exception(f"Could not load test runner {ymlFile}") + # load the given yaml file + if not self.loadYaml(test_vars): + raise Exception(f"Could not load test runner {self.yamlFile}") def setMD5(self, msum): + """ Set the md5 sum""" self.md5sum = msum - def verify_runner(self, ymlFile): - self.fname = ymlFile - if os.path.isfile(ymlFile): + def verify_runner(self): + """ Function to determine the actual name of the yaml file. It tries: + yamlFile, yamlFile + '.yml', and yamlFile + '.yaml' + """ + self.fname = os.path.join(self.root_dir, 'timing', self.yamlFile) + if os.path.isfile(self.fname): pass - elif os.path.isfile(ymlFile + '.yml'): - self.fname = ymlFile + '.yml' - elif os.path.isfile(ymlFile + '.yaml'): - self.fname = ymlFile + '.yaml' + elif os.path.isfile(self.fname + '.yml'): + self.fname += '.yml' + elif os.path.isfile(self.fname + '.yaml'): + self.fname += '.yaml' else: self.fname = None def get(self, item): + """ General getter function""" return getattr(self, item, None) - def loadYaml(self, ymlFile, vars): - self.verify_runner(ymlFile) + def loadYaml(self, var): + """ Function to load the driver yaml file, this includes error checking against + the schema, and adding the loaded values to the class member variables + + Parameters + ---------- + var: dict, Listing of the overall testing variables + + Returns + ------- + Bool, True if the yaml was loaded, False otherwise. + """ + self.verify_runner() if self.fname: try: - validate(yaml.load(self.fname), yaml.load("test_schema.yaml")) + validate(yaml.load(self.fname), DRIVER_SCHEMA) except exceptions.ValidationError: return False drv = yaml.load(open(self.fname, 'r'), Loader=yaml.FullLoader) - for it in Runner.ITEMS: + for it in Driver.ITEMS: setattr(self, it, drv['driver'][it]) self.batch_output_file = f"{self.summary_file_root}_{self.timestamp}.out" - self.sqlfile = os.path.join(vars['sql_path'], self.name) + '-rank0.sqlite' + self.sqlfile = os.path.join(var['sql_path'], self.name) + '-rank0.sqlite' return True return False def get_driver(self): + """ Function to clone the given driver repo + """ cwd = os.getcwd() os.system(f"rm -rf {self.name}") @@ -101,12 +223,21 @@ def get_driver(self): os.chdir(cwd) def write_params(self): + """ Write the driver parameters to a yaml file for the test to run with + """ self.paramsfile = os.path.join(self.work_dir, self.name) + "_timing_params.yaml" with open(self.paramsfile, 'w') as yfh: yaml.dump(self.params, yfh) - def getExecutor(self, emirge, vars): - if vars['host'].lower() == 'lassen': + def getExecutor(self, emirge, var): + """ Define the Parsl Executor to use with this test. + + Parameters + ---------- + emirge: str, path to the emirge directory + var: dict, Listing of the overall testing variables + """ + if var['host'].lower() == 'lassen': self.executor = HighThroughputExecutor(label=self.name, working_dir=self.work_dir, address='lassen.llnl.gov', @@ -114,28 +245,28 @@ def getExecutor(self, emirge, vars): worker_debug=True, provider=LSFProvider(launcher=JsrunLauncher(overrides=''), walltime='01:00:00', - nodes_per_block=vars['nnodes'], - init_blocks=vars['nnodes'], - max_blocks=vars['nnodes'], + nodes_per_block=var['nnodes'], + init_blocks=var['nnodes'], + max_blocks=var['nnodes'], bsub_redirection=True, - scheduler_options=f"#BSUB -q {vars['queue']}", + queue=var['queue'], worker_init=("module load spectrum-mpi\n" f"source {os.path.join(emirge, 'config', 'activate_env.sh')}\n" - f"export PYOPENCL_CTX=\"{vars['pyopenclCTX']}\"\n" + f"export PYOPENCL_CTX=\"{var['pyopenclCTX']}\"\n" f"export XDG_CACHE_HOME=\"{os.path.join(os.sep, 'tmp', '$USER', 'xdg-scratch', self.name)}\"\n" - "rm -rf \$XDG_CACHE_HOME\n" + "rm -rf $XDG_CACHE_HOME\n" "rm -f timing-run-done\n" "which python\n" "conda env list\n" "env\n" "env | grep LSB_MCPU_HOSTS\n" ), - project=vars['project'], + project=var['project'], cmd_timeout=600 ) ) - elif vars['host'].lower() == 'quartz': + elif var['host'].lower() == 'quartz': # not completely configured yet ''' self.executor = HighThroughputExecutor(label=self.name, @@ -158,18 +289,43 @@ def getExecutor(self, emirge, vars): )''' raise Exception("Executor for quartz not implemented yet.") else: - raise Exception(f"Could not create Executor for {vars['host']}") + raise Exception(f"Could not create Executor for {var['host']}") def run(cls, outputs=[]): + """ Run the test. This function is called through a Parsl bash_app. + + Parameters + ---------- + cls: Driver instance, defining the test to be run + outputs: List of Parsl File objects containing the output data from the test + """ + import os + from parsl.data_provider.files import File outputs.append(File(cls.sqlfile)) return f"python -O -u -m mpi4py {os.path.join(cls.work_dir, cls.name + '.py')} -i {cls.paramsfile} {cls.execopts}" + def process(cls, var, inputs=[]): + """ Process the test results. This function is run through a Parsl python_app. + + Parameters + ---------- + cls: Driver instance, defining the test to be processed + var: dict, Listing of the overall testing variables + inputs: List of Parsl File objects containing the outputs from the test run + + Returns + ------- + Bool, True if the run completed, False otherwise + + """ + import os import subprocess import socket import datetime import platform + import yaml uname = platform.uname() timing_platform = uname.system @@ -179,31 +335,31 @@ def process(cls, var, inputs=[]): return False if os.path.isfile(cls.yaml_file_name): os.remove(cls.yaml_file_name) - summary_file_name = os.path.join(var['sql_path'], cls.summary_file_root + '_' + cls.timestamp) + '.sqlite' - if os.path.isfile(summary_file_name): - os.remove(summary_file_name) + s_file_name = os.path.join(var['sql_path'], cls.summary_file_root + '_' + cls.timestamp) + '.sqlite' + if os.path.isfile(s_file_name): + os.remove(s_file_name) # --- Pull the timings out of the sqlite files generated by logging - rgather = subprocess.Popen(f"runalyzer-gather {summary_file_name} {inputs[0]////////}", + rgather = subprocess.Popen(f"runalyzer-gather {s_file_name} {inputs[0]}", stdout=subprocess.PIPE, shell=True, text=True) outs, errs = rgather.communicate() - cld = subprocess.Popen(f"$(sqlite3 {summary_file_name} 'SELECT cl_device_name FROM runs')", + cld = subprocess.Popen(f"$(sqlite3 {s_file_name} 'SELECT cl_device_name FROM runs')", stdout=subprocess.PIPE, shell=True, text=True) cl_device, errs = cld.communicate() - stup = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(q(\"select $t_init.max\").fetchall()[0][0])' | grep -v INFO)", + stup = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(q(\"select $t_init.max\").fetchall()[0][0])' | grep -v INFO)", stdout=subprocess.PIPE, shell=True, text=True) startup_time, errs = stup.communicate() - fst = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[0:1]))' | grep -v INFO))", + fst = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[0:1]))' | grep -v INFO))", stdout=subprocess.PIPE, shell=True, text=True) first_step, errs = fst.communicate() - ften = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[0:10]))' | grep -v INFO)", + ften = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[0:10]))' | grep -v INFO)", stdout=subprocess.PIPE, shell=True, text=True) first_10_steps, errs = ften.communicate() - sten = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[10:19]))' | grep -v INFO)", + sten = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[10:19]))' | grep -v INFO)", stdout=subprocess.PIPE, shell=True, text=True) second_10_steps = sten.communicate() @@ -213,9 +369,9 @@ def process(cls, var, inputs=[]): 'run_epoch': datetime.datetime.now().timestamp(), 'run_platform': timing_platform, 'run_arch': timing_arch, - 'gpu_arch': vars['gpu_arch'], - 'mirge_version': vars['mirge_hash'], - 'y1_version': vars['mirge_hash'], + 'gpu_arch': var['gpu_arch'], + 'mirge_version': var['mirge_hash'], + 'y1_version': var['mirge_hash'], 'driver_version': cls.hash, 'driver_md5sum': cls.md5sum, 'time_startup': startup_time, @@ -235,11 +391,18 @@ def process(cls, var, inputs=[]): output['max_python_mem_usage'] = max_python_mem_usage output['max_gpu_mem_usage'] = max_gpu_mem_usage - yaml.dump(output, open(cls.yaml_file_name, 'w')) return True + def setup_env(args): + """ Set up the global test environment, including cloning the emirge repo and building it. + + Parameters + ---------- + args: command line arguments + + """ cwd = os.getcwd() # -- Install conda env, dependencies and MIRGE-Com via *emirge* # --- remove old run if it exists @@ -262,6 +425,7 @@ def setup_env(args): def parse_args(): + """ Parse command line arguments """ parser = argparse.ArgumentParser(description="Mirgecom timing tests") parser.add_argument("-r", "--run", type=ascii, dest="runners", action="store", nargs="?", @@ -271,19 +435,22 @@ def parse_args(): parser.add_argument("-b", "--mirge_branch", type=ascii, action='store', dest="mirge_branch", default="production", help="The branch of emirge to build, Default is production") parser.add_argument("-y", "--yml", action='store', default='testing.yaml') + parser.add_argument("-l", "--lazy", action='store_true', dest="lazy", help="Run lazy computations") parser.add_argument() return parser.parse_args() + if __name__ == '__main__': - timing_home = os.getcwd() + timing_home = os.path.realpath(os.path.dirname(__file__)) emirge_home = os.path.join(timing_home, 'emirge') cmdargs = parse_args() master_vars = yaml.load(cmdargs.yml, Loader=yaml.FullLoader) + validate(master_vars, TIMING_SCHEMA) runners = [] for yml in cmdargs.runners: - runners.append(Runner(yml)) + runners.append(Driver(yml, cmdargs.lazy, timing_home)) master_vars['mirge_hash'] = setup_env(cmdargs) executors = [] run_jobs = [] @@ -307,7 +474,7 @@ def parse_args(): for runner in runners: crunner = (bash_app(executors=[runner.name]))(run)(runner) run_jobs.append(crunner) - process_jobs.append((python_app(executors=[runner.name])))(run)(runner, master_vars, inputs=crunner.outputs[0]) + process_jobs.append((python_app(executors=[runner.name]))(run)(runner, master_vars, inputs=crunner.outputs[0])) # -- Run the case (platform-dependent) #printf "Running on Host: ${TIMING_HOST}\n" @@ -340,11 +507,11 @@ def parse_args(): # ---- Create the timing file if it does not exist count = 0 for runner, i in enumerate(runners): - if not process_job.result(): + if not process_jobs[i].result(): print(f"Timing run did not produce the expected sqlite file: {runner.sqlfile}\n") continue count += 1 - if not os.path.exists(os.path.join("timing", runner.yaml_file_name)) + if not os.path.exists(os.path.join("timing", runner.yaml_file_name)): Path(os.path.join(os.getcwd(), 'timing', runner.yaml_file_name)).touch() timing_repo.index.add([os.path.join("timing", runner.yaml_file_name)]) @@ -355,8 +522,8 @@ def parse_args(): summary_file_name = os.path.join(master_vars['sql_path'], runner.summary_file_root + '_' + runner.timestamp) + '.sqlite' shutil.copy(summary_file_name, os.path.join("timing", runner.logdir)) os.system(f"cat *.out > timing/{runner.logdir}/{runner.batch_output_file}") - timing_repo.index.add([os.path.join('timing', runner.logdir]) + timing_repo.index.add([os.path.join('timing', runner.logdir)]) # ---- Commit the new data to the repo - if count > 0 + if count > 0: commit = timing_repo.indexs.commit(f"Automatic commit: {socket.gethostname()} {TIMING_DATE}") timing_repo.remotes.origin.push() From 80ccd5534e99a6ab30ca8cd01b82c3aea6e47d45 Mon Sep 17 00:00:00 2001 From: astro-friedel Date: Mon, 3 Apr 2023 09:56:39 -0500 Subject: [PATCH 5/6] initial working version --- test_drivers/testing.yaml | 4 +- test_drivers/time-lazy-flame1d.yaml | 2 +- test_drivers/time-lazy-isolator.yaml | 2 +- test_drivers/time-lazy-nozzle.yaml | 2 +- timing.py | 218 +++++++++++++-------------- 5 files changed, 111 insertions(+), 117 deletions(-) diff --git a/test_drivers/testing.yaml b/test_drivers/testing.yaml index c74aeae5d..1345d2278 100644 --- a/test_drivers/testing.yaml +++ b/test_drivers/testing.yaml @@ -1,7 +1,7 @@ timing: - sql_path: ./log_data + sql_path: log_data project: uiuc - queue: pbatch + queue: pdebug nnodes: 1 mirge_branch: production pyopenclCTX: port:tesla diff --git a/test_drivers/time-lazy-flame1d.yaml b/test_drivers/time-lazy-flame1d.yaml index 4851ccba5..ec059fb51 100644 --- a/test_drivers/time-lazy-flame1d.yaml +++ b/test_drivers/time-lazy-flame1d.yaml @@ -2,7 +2,7 @@ driver: name: flame1d-lazy repo: illinois-ceesd/drivers_flame1d branch: main - exename: &EXE flame1d + execname: &EXE flame1d params: nviz: 100 nrestart: 100 diff --git a/test_drivers/time-lazy-isolator.yaml b/test_drivers/time-lazy-isolator.yaml index 1c5b0c332..1fc373d53 100644 --- a/test_drivers/time-lazy-isolator.yaml +++ b/test_drivers/time-lazy-isolator.yaml @@ -2,7 +2,7 @@ driver: name: y2-isolator repo: illinois-ceesd/drivers_y2-isolator branch: main - exename: &EXE isolator + execname: &EXE isolator params: nviz: 100 nrestart: 100 diff --git a/test_drivers/time-lazy-nozzle.yaml b/test_drivers/time-lazy-nozzle.yaml index 3701ecceb..a515809a0 100644 --- a/test_drivers/time-lazy-nozzle.yaml +++ b/test_drivers/time-lazy-nozzle.yaml @@ -2,7 +2,7 @@ driver: name: y1-production-nozzle-lazy repo: illinois-ceesd/drivers_y1-nozzle branch: main - exename: &EXE nozzle + execname: &EXE nozzle params: nviz: 100 nrestart: 100 diff --git a/timing.py b/timing.py index 2fba13fe7..b33fb8fb2 100644 --- a/timing.py +++ b/timing.py @@ -4,7 +4,7 @@ import os import shutil import yaml -from jsonschema import validate, exceptions +from jsonschema import validate import subprocess import datetime import parsl @@ -15,6 +15,7 @@ from parsl.executors import HighThroughputExecutor from parsl.providers import LSFProvider from parsl.launchers import JsrunLauncher +from parsl.data_provider.files import File # quartz imports # from parsl.providers import SlurmProvider # from parsl.launchers import SrunLauncher @@ -24,28 +25,6 @@ def nodeJoin(loader, nodeid): - """ Helper function to handle the !join keyword in yaml documents - - Parameters - ---------- - loader: yaml.Loader instance - nodeid: the name of the node to operate on - - Example - ------- - yaml document as follows: - - exec: &EXEC isolator - log: !join [ *EXEC, .log ] - - will produce - { - "exec": "isolator", - "log": "isolator.log" - } - - when loaded - """ return ''.join([str(item) for item in loader.construct_sequence(nodeid)]) @@ -104,7 +83,7 @@ def nodeJoin(loader, nodeid): "name": {"type": "string"}, "repo": {"type": "string"}, "branch": {"type": "string"}, - "exename": {"type": "string"}, + "execname": {"type": "string"}, "params": {"type": "object"}, "timing_env_name": {"type": "string"}, "summary_file_root": {"type": "string"}, @@ -113,7 +92,7 @@ def nodeJoin(loader, nodeid): "execopts": {"type": "string"}, "mem_usage": {"type": "boolean"} }, - "required": ["name", "repo", "branch", "exename", "params", "timing_env_name", + "required": ["name", "repo", "branch", "execname", "params", "timing_env_name", "summary_file_root", "yaml_file_name", "logdir", "execopts"] } } @@ -130,9 +109,9 @@ class Driver: root_dir: str, the root directory for the tests test_vars: dict, Listing of the overall testing variables """ - ITEMS = ['name', 'repo', 'branch', 'execname', 'params', 'pyopenclCTX', + ITEMS = ['name', 'repo', 'branch', 'execname', 'params', 'execopts', 'timing_env_name', 'yaml_file_name', 'logdir', - 'mem_usage'] + 'mem_usage', 'summary_file_root'] def __init__(self, ymlFile, lazy, root_dir, test_vars): self.timestamp = datetime.datetime.now().strftime("%Y.%m.%d-%H.%M.%S") @@ -146,7 +125,7 @@ def __init__(self, ymlFile, lazy, root_dir, test_vars): self.sqlfile = None self.mem_usage = False self.root_dir = root_dir - self.yamlFile = "time-" + self.yamlFile = "test_drivers/time-" if lazy: self.yamlFile += "lazy-" self.yamlFile += ymlFile @@ -165,7 +144,7 @@ def verify_runner(self): """ Function to determine the actual name of the yaml file. It tries: yamlFile, yamlFile + '.yml', and yamlFile + '.yaml' """ - self.fname = os.path.join(self.root_dir, 'timing', self.yamlFile) + self.fname = os.path.join(self.root_dir, self.yamlFile) if os.path.isfile(self.fname): pass elif os.path.isfile(self.fname + '.yml'): @@ -173,7 +152,7 @@ def verify_runner(self): elif os.path.isfile(self.fname + '.yaml'): self.fname += '.yaml' else: - self.fname = None + raise Exception(f"Cound not find file. {self.fname}") def get(self, item): """ General getter function""" @@ -193,17 +172,14 @@ def loadYaml(self, var): """ self.verify_runner() if self.fname: - try: - validate(yaml.load(self.fname), DRIVER_SCHEMA) - except exceptions.ValidationError: - return False - drv = yaml.load(open(self.fname, 'r'), Loader=yaml.FullLoader) + drvr = yaml.load(open(self.fname, 'r'), Loader=yaml.FullLoader) + validate(drvr, DRIVER_SCHEMA) for it in Driver.ITEMS: - setattr(self, it, drv['driver'][it]) + setattr(self, it, drvr['driver'][it]) self.batch_output_file = f"{self.summary_file_root}_{self.timestamp}.out" - self.sqlfile = os.path.join(var['sql_path'], self.name) + '-rank0.sqlite' + self.sqlfile = os.path.join(var['sql_path'], self.execname) + '-rank0.sqlite' return True return False @@ -215,7 +191,7 @@ def get_driver(self): Repo.clone_from(f"https://github.com/{self.repo}", os.path.join(os.getcwd(), self.name), branch=self.branch) - os.chdir(os.path.join(self.name, "timing_run")) + os.chdir(os.path.join(self.name)) self.work_dir = os.getcwd() drepo = Repo(os.getcwd()) self.hash = drepo.rev_parse("HEAD").hexsha @@ -238,32 +214,31 @@ def getExecutor(self, emirge, var): var: dict, Listing of the overall testing variables """ if var['host'].lower() == 'lassen': - self.executor = HighThroughputExecutor(label=self.name, - working_dir=self.work_dir, - address='lassen.llnl.gov', - worker_port_range=(5000, 55000), - worker_debug=True, - provider=LSFProvider(launcher=JsrunLauncher(overrides=''), - walltime='01:00:00', - nodes_per_block=var['nnodes'], - init_blocks=var['nnodes'], - max_blocks=var['nnodes'], - bsub_redirection=True, - queue=var['queue'], - worker_init=("module load spectrum-mpi\n" - f"source {os.path.join(emirge, 'config', 'activate_env.sh')}\n" - f"export PYOPENCL_CTX=\"{var['pyopenclCTX']}\"\n" - f"export XDG_CACHE_HOME=\"{os.path.join(os.sep, 'tmp', '$USER', 'xdg-scratch', self.name)}\"\n" - "rm -rf $XDG_CACHE_HOME\n" - "rm -f timing-run-done\n" - "which python\n" - "conda env list\n" - "env\n" - "env | grep LSB_MCPU_HOSTS\n" - ), - project=var['project'], - cmd_timeout=600 - ) + return HighThroughputExecutor(label=self.name, + working_dir=self.work_dir, + address='lassen.llnl.gov', + worker_port_range=(5000, 55000), + worker_debug=True, + provider=LSFProvider(launcher=JsrunLauncher(overrides=''), + walltime='01:00:00', + nodes_per_block=var['nnodes'], + init_blocks=var['nnodes'], + max_blocks=var['nnodes'], + bsub_redirection=True, + queue=var['queue'], + worker_init=("module load spectrum-mpi\n" + f"source {os.path.join(emirge, 'miniforge3', 'bin', 'activate')} timing_tests\n" + f"export PYOPENCL_CTX=\"{var['pyopenclCTX']}\"\n" + f"export XDG_CACHE_HOME=\"{os.path.join(os.sep, 'tmp', '$USER', 'xdg-scratch', self.name)}\"\n" + "rm -rf $XDG_CACHE_HOME\n" + "rm -f timing-run-done\n" + "which python\n" + "conda env list\n" + f"cd {os.path.join(self.work_dir, 'timing_run')}\n" + ), + project=var['project'], + cmd_timeout=600 + ) ) elif var['host'].lower() == 'quartz': @@ -292,27 +267,44 @@ def getExecutor(self, emirge, var): raise Exception(f"Could not create Executor for {var['host']}") -def run(cls, outputs=[]): +def run(work_dir, paramsfile, execn, execopts, outputs=[]): """ Run the test. This function is called through a Parsl bash_app. Parameters ---------- - cls: Driver instance, defining the test to be run + work_dir: str + root working directory + paramsfile: str + Filename for the parameters file + execn: str + Name of the executable + execopts: str + Any command line flags/options for the execn outputs: List of Parsl File objects containing the output data from the test """ import os - from parsl.data_provider.files import File - outputs.append(File(cls.sqlfile)) - return f"python -O -u -m mpi4py {os.path.join(cls.work_dir, cls.name + '.py')} -i {cls.paramsfile} {cls.execopts}" + return f"python -O -u -m mpi4py {os.path.join(work_dir, 'timing_run', execn + '.py')} -i {paramsfile} {execopts}" -def process(cls, var, inputs=[]): +def process(summary_file_root, timestamp, yaml_file_name, hsh, md5sum, var, mem_usage, inputs=[]): """ Process the test results. This function is run through a Parsl python_app. Parameters ---------- - cls: Driver instance, defining the test to be processed - var: dict, Listing of the overall testing variables + summary_file_root: str + Root path for the summary file + timestamp: str + Timestamp the process was started + var: dict + Listing of the overall testing variables + yaml_file_name: str + Name of the yaml control file + hsh: str + Hash of the repo + md5sum: str + Md5sum of the exec file + mem_usage: bool + Whether to calculate memory usage for the test inputs: List of Parsl File objects containing the outputs from the test run Returns @@ -333,31 +325,31 @@ def process(cls, var, inputs=[]): if not os.path.isfile(inputs[0]): return False - if os.path.isfile(cls.yaml_file_name): - os.remove(cls.yaml_file_name) - s_file_name = os.path.join(var['sql_path'], cls.summary_file_root + '_' + cls.timestamp) + '.sqlite' + if os.path.isfile(yaml_file_name): + os.remove(yaml_file_name) + s_file_name = os.path.join(var['sql_path'], summary_file_root + '_' + timestamp) + '.sqlite' if os.path.isfile(s_file_name): os.remove(s_file_name) # --- Pull the timings out of the sqlite files generated by logging rgather = subprocess.Popen(f"runalyzer-gather {s_file_name} {inputs[0]}", stdout=subprocess.PIPE, shell=True, text=True) - outs, errs = rgather.communicate() + _, _ = rgather.communicate() cld = subprocess.Popen(f"$(sqlite3 {s_file_name} 'SELECT cl_device_name FROM runs')", stdout=subprocess.PIPE, shell=True, text=True) - cl_device, errs = cld.communicate() + cl_device, _ = cld.communicate() stup = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(q(\"select $t_init.max\").fetchall()[0][0])' | grep -v INFO)", stdout=subprocess.PIPE, shell=True, text=True) - startup_time, errs = stup.communicate() + startup_time, _ = stup.communicate() fst = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[0:1]))' | grep -v INFO))", stdout=subprocess.PIPE, shell=True, text=True) - first_step, errs = fst.communicate() + first_step, _ = fst.communicate() ften = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[0:10]))' | grep -v INFO)", stdout=subprocess.PIPE, shell=True, text=True) - first_10_steps, errs = ften.communicate() + first_10_steps, _ = ften.communicate() sten = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[10:19]))' | grep -v INFO)", stdout=subprocess.PIPE, shell=True, text=True) @@ -372,26 +364,26 @@ def process(cls, var, inputs=[]): 'gpu_arch': var['gpu_arch'], 'mirge_version': var['mirge_hash'], 'y1_version': var['mirge_hash'], - 'driver_version': cls.hash, - 'driver_md5sum': cls.md5sum, + 'driver_version': hsh, + 'driver_md5sum': md5sum, 'time_startup': startup_time, 'time_first_step': first_step, 'time_first_10': first_10_steps, 'time_second_10': second_10_steps} - if cls.mem_usage: - mpmu = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(max(p[0] for p in q(\"select $memory_usage_python.max\").fetchall()))' | grep -v INFO)", + if mem_usage: + mpmu = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(max(p[0] for p in q(\"select $memory_usage_python.max\").fetchall()))' | grep -v INFO)", stdout=subprocess.PIPE, shell=True, text=True) max_python_mem_usage, err = mpmu.communicate() - mgmu = subprocess.Popen(f"$(runalyzer -m {summary_file_name} -c 'print(max(p[0] for p in q(\"select $memory_usage_gpu.max\").fetchall()))' | grep -v INFO)", + mgmu = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(max(p[0] for p in q(\"select $memory_usage_gpu.max\").fetchall()))' | grep -v INFO)", stdout=subprocess.PIPE, shell=True, text=True) max_gpu_mem_usage, err = mgmu.communicate() output['max_python_mem_usage'] = max_python_mem_usage output['max_gpu_mem_usage'] = max_gpu_mem_usage - yaml.dump(output, open(cls.yaml_file_name, 'w')) + yaml.dump(output, open(yaml_file_name, 'w')) return True @@ -414,7 +406,7 @@ def setup_env(args): Repo.clone_from("https://github.com/illinois-ceesd/emirge.git", os.path.join(cwd, "emirge")) os.chdir("emirge") - os.system(f"./install.sh --branch=${args.mirge_branch} --env-name=timing_tests") + os.system(f"./install.sh --branch={args.mirge_branch} --env-name=timing_tests") os.chdir("mirgecom") mrepo = Repo(os.getcwd()) # -- Grab and merge the branch with the case-dependent features @@ -427,16 +419,14 @@ def setup_env(args): def parse_args(): """ Parse command line arguments """ parser = argparse.ArgumentParser(description="Mirgecom timing tests") - parser.add_argument("-r", "--run", type=ascii, dest="runners", - action="store", nargs="?", + parser.add_argument("-r", "--run", dest="runners", action="store", help="The tests to run (must match yaml file name)") - parser.add_argument("-m", "--build_mirgecom", actio='store_true', + parser.add_argument("-m", "--build_mirgecom", action='store_true', dest="buld_mirecom", help="Whether to build mirgecom") - parser.add_argument("-b", "--mirge_branch", type=ascii, action='store', dest="mirge_branch", + parser.add_argument("-b", "--mirge_branch", action='store', dest="mirge_branch", default="production", help="The branch of emirge to build, Default is production") parser.add_argument("-y", "--yml", action='store', default='testing.yaml') parser.add_argument("-l", "--lazy", action='store_true', dest="lazy", help="Run lazy computations") - parser.add_argument() return parser.parse_args() @@ -445,39 +435,43 @@ def parse_args(): emirge_home = os.path.join(timing_home, 'emirge') cmdargs = parse_args() - master_vars = yaml.load(cmdargs.yml, Loader=yaml.FullLoader) + cmdargs.runners = cmdargs.runners.replace("'", "").strip().split(',') + print(cmdargs.runners) + master_vars = yaml.load(open(os.path.join('test_drivers', cmdargs.yml), 'r'), Loader=yaml.FullLoader) validate(master_vars, TIMING_SCHEMA) - + master_vars = master_vars['timing'] runners = [] - for yml in cmdargs.runners: - runners.append(Driver(yml, cmdargs.lazy, timing_home)) - master_vars['mirge_hash'] = setup_env(cmdargs) executors = [] - run_jobs = [] - process_jobs = [] - for runner in runners: - - runner.get_driver() - # --- Get an MD5Sum for the untracked timing driver - + for yml in cmdargs.runners: + drv = (Driver(yml, cmdargs.lazy, timing_home, master_vars)) + drv.get_driver() md5_hash = hashlib.md5() - with open(runner.get('execname') + '.py', 'rb') as fh: + with open(os.path.join(drv.work_dir, 'timing_run', drv.get('execname')) + '.py', 'rb') as fh: for block in iter(lambda: fh.read(4096), b""): md5_hash.update(block) - runner.setMD5(md5_hash.hexdigest()) - runner.getExecutor(emirge_home, master_vars) - runner.write_params() - executors.append(runner.executor) + drv.setMD5(md5_hash.hexdigest()) + executors.append(drv.getExecutor(emirge_home, master_vars)) + drv.write_params() + runners.append(drv) + + master_vars['mirge_hash'] = setup_env(cmdargs) + run_jobs = [] + process_jobs = [] parsl.set_stream_logger() parsl.clear() parsl.load(Config(executors=executors)) for runner in runners: - crunner = (bash_app(executors=[runner.name]))(run)(runner) + print(os.path.join(os.getcwd(), runner.sqlfile)) + crunner = bash_app(executors=[runner.name])(run)(runner.work_dir, runner.name, runner.paramsfile, runner.execname, runner.execopts, outputs=[File(os.path.join(runner.work_dir, 'timing_run', runner.sqlfile))]) run_jobs.append(crunner) - process_jobs.append((python_app(executors=[runner.name]))(run)(runner, master_vars, inputs=crunner.outputs[0])) + + process_jobs.append(python_app(executors=[runner.name])(process)(runner.summary_file_root, runner.timestamp, runner.yaml_file_name, runner.hash, runner.md5sum, master_vars, runner.mem_usage, inputs=[crunner.outputs[0]])) + + for job in process_jobs: + job.result() # -- Run the case (platform-dependent) - #printf "Running on Host: ${TIMING_HOST}\n" + # printf "Running on Host: ${TIMING_HOST}\n" # Users should set special keys for using git over # ssh for security concerns. This snippet will use # a pre-arranged ssh key if the user provides one From c2db509736bf44a42cf2cdf0e1448eae60bf9f39 Mon Sep 17 00:00:00 2001 From: astro_friedel Date: Wed, 21 Jun 2023 14:15:44 -0700 Subject: [PATCH 6/6] working version of workflow after bug fixes in driver --- test_drivers/time-lazy-flame1d.yaml | 4 +- timing.py | 234 ++++++++++++++-------------- 2 files changed, 115 insertions(+), 123 deletions(-) mode change 100644 => 100755 timing.py diff --git a/test_drivers/time-lazy-flame1d.yaml b/test_drivers/time-lazy-flame1d.yaml index ec059fb51..93c2b2d6d 100644 --- a/test_drivers/time-lazy-flame1d.yaml +++ b/test_drivers/time-lazy-flame1d.yaml @@ -1,7 +1,7 @@ driver: name: flame1d-lazy repo: illinois-ceesd/drivers_flame1d - branch: main + branch: update-to-y3 execname: &EXE flame1d params: nviz: 100 @@ -20,6 +20,6 @@ driver: timing_env_name: !join [*EXE, .lazy.timing.env] summary_file_root: !join [*EXE, _lazy] yaml_file_name: !join [*EXE, -timings.yaml] - logdir: !join [*EXE, _logs] + logdir: !join [*EXE, _lazy_logs] execopts: --lazy --log mem_usage: False diff --git a/timing.py b/timing.py old mode 100644 new mode 100755 index b33fb8fb2..2e1fa6d0c --- a/timing.py +++ b/timing.py @@ -4,7 +4,7 @@ import os import shutil import yaml -from jsonschema import validate +from jsonschema import validate, exceptions import subprocess import datetime import parsl @@ -16,6 +16,7 @@ from parsl.providers import LSFProvider from parsl.launchers import JsrunLauncher from parsl.data_provider.files import File + # quartz imports # from parsl.providers import SlurmProvider # from parsl.launchers import SrunLauncher @@ -25,13 +26,34 @@ def nodeJoin(loader, nodeid): + """ Helper function to handle the !join keyword in yaml documents + + Parameters + ---------- + loader: yaml.Loader instance + nodeid: the name of the node to operate on + + Example + ------- + yaml document as follows: + + exec: &EXEC isolator + log: !join [ *EXEC, .log ] + + will produce + { + "exec": "isolator", + "log": "isolator.log" + } + + when loaded + """ return ''.join([str(item) for item in loader.construct_sequence(nodeid)]) # add the nodeJoin function to the yal Loader yaml.add_constructor('!join', nodeJoin) -TIMING_DATE = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") print(datetime.datetime.now()) @@ -66,7 +88,7 @@ def nodeJoin(loader, nodeid): } } -# schema for the driver yamls +# schema for the driver yamls DRIVER_SCHEMA = {"title": "Driver Schema", "description": "Schema for testing driver", "type": "object", @@ -98,16 +120,15 @@ def nodeJoin(loader, nodeid): } } - class Driver: - """ Class to define a test instance - - Parameters - ---------- - ymlFile: str, the name of the yaml file giving the parameters for the run - lazy: bool, whether the test is a lazy test - root_dir: str, the root directory for the tests - test_vars: dict, Listing of the overall testing variables + """ Class to define a test instance + + Parameters + ---------- + ymlFile: str, the name of the yaml file giving the parameters for the run + lazy: bool, whether the test is a lazy test + root_dir: str, the root directory for the tests + test_vars: dict, Listing of the overall testing variables """ ITEMS = ['name', 'repo', 'branch', 'execname', 'params', 'execopts', 'timing_env_name', 'yaml_file_name', 'logdir', @@ -132,7 +153,7 @@ def __init__(self, ymlFile, lazy, root_dir, test_vars): for it in Driver.ITEMS: setattr(self, it, None) - # load the given yaml file + # load the given yaml file if not self.loadYaml(test_vars): raise Exception(f"Could not load test runner {self.yamlFile}") @@ -141,8 +162,8 @@ def setMD5(self, msum): self.md5sum = msum def verify_runner(self): - """ Function to determine the actual name of the yaml file. It tries: - yamlFile, yamlFile + '.yml', and yamlFile + '.yaml' + """ Function to determine the actual name of the yaml file. It tries: + yamlFile, yamlFile + '.yml', and yamlFile + '.yaml' """ self.fname = os.path.join(self.root_dir, self.yamlFile) if os.path.isfile(self.fname): @@ -159,24 +180,24 @@ def get(self, item): return getattr(self, item, None) def loadYaml(self, var): - """ Function to load the driver yaml file, this includes error checking against - the schema, and adding the loaded values to the class member variables - - Parameters - ---------- - var: dict, Listing of the overall testing variables - - Returns - ------- - Bool, True if the yaml was loaded, False otherwise. + """ Function to load the driver yaml file, this includes error checking against + the schema, and adding the loaded values to the class member variables + + Parameters + ---------- + var: dict, Listing of the overall testing variables + + Returns + ------- + Bool, True if the yaml was loaded, False otherwise. """ self.verify_runner() if self.fname: - drvr = yaml.load(open(self.fname, 'r'), Loader=yaml.FullLoader) - validate(drvr, DRIVER_SCHEMA) + drv = yaml.load(open(self.fname, 'r'), Loader=yaml.FullLoader) + validate(drv, DRIVER_SCHEMA) for it in Driver.ITEMS: - setattr(self, it, drvr['driver'][it]) + setattr(self, it, drv['driver'][it]) self.batch_output_file = f"{self.summary_file_root}_{self.timestamp}.out" self.sqlfile = os.path.join(var['sql_path'], self.execname) + '-rank0.sqlite' @@ -184,7 +205,7 @@ def loadYaml(self, var): return False def get_driver(self): - """ Function to clone the given driver repo + """ Function to clone the given driver repo """ cwd = os.getcwd() os.system(f"rm -rf {self.name}") @@ -199,19 +220,19 @@ def get_driver(self): os.chdir(cwd) def write_params(self): - """ Write the driver parameters to a yaml file for the test to run with + """ Write the driver parameters to a yaml file for the test to run with """ self.paramsfile = os.path.join(self.work_dir, self.name) + "_timing_params.yaml" with open(self.paramsfile, 'w') as yfh: yaml.dump(self.params, yfh) def getExecutor(self, emirge, var): - """ Define the Parsl Executor to use with this test. - - Parameters - ---------- - emirge: str, path to the emirge directory - var: dict, Listing of the overall testing variables + """ Define the Parsl Executor to use with this test. + + Parameters + ---------- + emirge: str, path to the emirge directory + var: dict, Listing of the overall testing variables """ if var['host'].lower() == 'lassen': return HighThroughputExecutor(label=self.name, @@ -242,44 +263,37 @@ def getExecutor(self, emirge, var): ) elif var['host'].lower() == 'quartz': - # not completely configured yet - ''' - self.executor = HighThroughputExecutor(label=self.name, - working_dir='', - address='quartz.llnl.gov', - worker_port_range=(50000, 55000), - worker_debug=True, - provider=SlurmProvider(launcher=SrunLauncher(overrides=f''), - walltime='02:00:00', - nodes_per_block=1, - init_blocks=1, - max_blocks=1, - scheduler_options='#SBATCH -q pdebug', - worker_init=("module load spectrum-mpi\n" - f"export XDG_CACHE_HOME=/tmp/$USER/xdg-scratch_{self.name}\n" - "conda activate ...\n" - ), - cmd_timeout=600 - ) + # not completely configured yet + ''' + self.executor = HighThroughputExecutor(label=self.name, + working_dir='', + address='quartz.llnl.gov', + worker_port_range=(50000, 55000), + worker_debug=True, + provider=SlurmProvider(launcher=SrunLauncher(overrides=f''), + walltime='02:00:00', + nodes_per_block=1, + init_blocks=1, + max_blocks=1, + scheduler_options='#SBATCH -q pdebug', + worker_init=("module load spectrum-mpi\n" + f"export XDG_CACHE_HOME=/tmp/$USER/xdg-scratch_{self.name}\n" + "conda activate ...\n" + ), + cmd_timeout=600 + ) )''' raise Exception("Executor for quartz not implemented yet.") else: raise Exception(f"Could not create Executor for {var['host']}") -def run(work_dir, paramsfile, execn, execopts, outputs=[]): +def run(work_dir, name, paramsfile, execn, execopts, outputs=[]): """ Run the test. This function is called through a Parsl bash_app. Parameters ---------- - work_dir: str - root working directory - paramsfile: str - Filename for the parameters file - execn: str - Name of the executable - execopts: str - Any command line flags/options for the execn + cls: Driver instance, defining the test to be run outputs: List of Parsl File objects containing the output data from the test """ import os @@ -291,20 +305,8 @@ def process(summary_file_root, timestamp, yaml_file_name, hsh, md5sum, var, mem_ Parameters ---------- - summary_file_root: str - Root path for the summary file - timestamp: str - Timestamp the process was started - var: dict - Listing of the overall testing variables - yaml_file_name: str - Name of the yaml control file - hsh: str - Hash of the repo - md5sum: str - Md5sum of the exec file - mem_usage: bool - Whether to calculate memory usage for the test + cls: Driver instance, defining the test to be processed + var: dict, Listing of the overall testing variables inputs: List of Parsl File objects containing the outputs from the test run Returns @@ -319,6 +321,7 @@ def process(summary_file_root, timestamp, yaml_file_name, hsh, md5sum, var, mem_ import platform import yaml + TIMING_DATE = datetime.datetime.now().strftime("%Y-%m-%d %H:%M") uname = platform.uname() timing_platform = uname.system timing_arch = uname.machine @@ -334,22 +337,22 @@ def process(summary_file_root, timestamp, yaml_file_name, hsh, md5sum, var, mem_ rgather = subprocess.Popen(f"runalyzer-gather {s_file_name} {inputs[0]}", stdout=subprocess.PIPE, shell=True, text=True) - _, _ = rgather.communicate() + outs, errs = rgather.communicate() cld = subprocess.Popen(f"$(sqlite3 {s_file_name} 'SELECT cl_device_name FROM runs')", stdout=subprocess.PIPE, shell=True, text=True) - cl_device, _ = cld.communicate() + cl_device, errs = cld.communicate() stup = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(q(\"select $t_init.max\").fetchall()[0][0])' | grep -v INFO)", stdout=subprocess.PIPE, shell=True, text=True) - startup_time, _ = stup.communicate() + startup_time, errs = stup.communicate() fst = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[0:1]))' | grep -v INFO))", stdout=subprocess.PIPE, shell=True, text=True) - first_step, _ = fst.communicate() + first_step, errs = fst.communicate() ften = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[0:10]))' | grep -v INFO)", stdout=subprocess.PIPE, shell=True, text=True) - first_10_steps, _ = ften.communicate() + first_10_steps, errs = ften.communicate() sten = subprocess.Popen(f"$(runalyzer -m {s_file_name} -c 'print(sum(p[0] for p in q(\"select $t_step.max\").fetchall()[10:19]))' | grep -v INFO)", stdout=subprocess.PIPE, shell=True, text=True) @@ -443,6 +446,7 @@ def parse_args(): runners = [] executors = [] for yml in cmdargs.runners: + print(yml) drv = (Driver(yml, cmdargs.lazy, timing_home, master_vars)) drv.get_driver() md5_hash = hashlib.md5() @@ -457,67 +461,55 @@ def parse_args(): master_vars['mirge_hash'] = setup_env(cmdargs) run_jobs = [] process_jobs = [] + #for runner in runners: + + # --- Get an MD5Sum for the untracked timing driver parsl.set_stream_logger() parsl.clear() parsl.load(Config(executors=executors)) for runner in runners: + print("RUNNER") print(os.path.join(os.getcwd(), runner.sqlfile)) crunner = bash_app(executors=[runner.name])(run)(runner.work_dir, runner.name, runner.paramsfile, runner.execname, runner.execopts, outputs=[File(os.path.join(runner.work_dir, 'timing_run', runner.sqlfile))]) run_jobs.append(crunner) process_jobs.append(python_app(executors=[runner.name])(process)(runner.summary_file_root, runner.timestamp, runner.yaml_file_name, runner.hash, runner.md5sum, master_vars, runner.mem_usage, inputs=[crunner.outputs[0]])) - for job in process_jobs: - job.result() - - # -- Run the case (platform-dependent) - # printf "Running on Host: ${TIMING_HOST}\n" - # Users should set special keys for using git over - # ssh for security concerns. This snippet will use - # a pre-arranged ssh key if the user provides one - # and indicates it with the TESTING_SSH_KEY environment - # variable. - # ===== To create a key: - # - Run ssh-keygen: - # $ ssh-keygen - # [enter a when prompted] - # - Put the key(s) in a /secure/filesystem/location: - # $ mv * /secure/filesystem/location - # - Add the key to GIT: - # $ [browse to] https://github.com/illinois-ceesd/timing/settings/keys/new - # $ Choose (New SSH key) - # $ Paste in the contents of /secure/filesystem/location/.pub - # - Set the ENV variable before using this script: - # $ export TESTING_SSH_KEY=/secure/filesystem/location/ + for job in run_jobs: + print(f"JOB STATUS {job.result()}") + print(len(process_jobs)) + print("JOBS DONE") + if 'TESTING_SSH_KEY' in os.environ: ssh_job = subprocess.Popen("eval $(ssh-agent); trap \"kill $SSH_AGENT_PID\" EXIT; ssh-add ${TESTING_SSH_KEY}", stdout=subprocess.PIPE, shell=True, text=True) # --- Update the timing data in the repo # ---- First, clone the timing repo - Repo.clone_from(f"git@github.com:{master_vars['timing_repo']}", os.path.join(os.getcwd(), 'timing'), - branch=master_vars['timing_branch']) - timing_repo = Repo(os.path.join(os.getcwd(), 'timing')) + #Repo.clone_from(f"git@github.com:{master_vars['timing_repo']}", os.path.join(os.getcwd(), 'timing'), + # branch=master_vars['timing_branch']) + timing_repo = Repo(os.getcwd()) # ---- Create the timing file if it does not exist count = 0 - for runner, i in enumerate(runners): + for i, runner in enumerate(runners): if not process_jobs[i].result(): print(f"Timing run did not produce the expected sqlite file: {runner.sqlfile}\n") continue count += 1 - if not os.path.exists(os.path.join("timing", runner.yaml_file_name)): - Path(os.path.join(os.getcwd(), 'timing', runner.yaml_file_name)).touch() - timing_repo.index.add([os.path.join("timing", runner.yaml_file_name)]) + if not os.path.exists(runner.yaml_file_name): + Path(os.path.join(os.getcwd(), runner.yaml_file_name)).touch() + timing_repo.index.add([runner.yaml_file_name]) # ---- Update the timing file with the current test data - os.system(f"cat {runner.yaml_file_name} >> timing/{runner.yaml_file_name}") - os.makedirs(os.path.join('timin', runner.logdir), exist_ok=True) - summary_file_name = os.path.join(master_vars['sql_path'], runner.summary_file_root + '_' + runner.timestamp) + '.sqlite' - shutil.copy(summary_file_name, os.path.join("timing", runner.logdir)) + #os.system(f"cat {runner.yaml_file_name} >> timing/{runner.yaml_file_name}") + #os.makedirs(runner.logdir, exist_ok=True) + summary_file_name = os.path.join('flame1d-lazy', 'timing_run', master_vars['sql_path'], runner.summary_file_root + '_' + runner.timestamp) + '.sqlite' + shutil.copy(summary_file_name, runner.logdir) os.system(f"cat *.out > timing/{runner.logdir}/{runner.batch_output_file}") - timing_repo.index.add([os.path.join('timing', runner.logdir)]) - # ---- Commit the new data to the repo - if count > 0: - commit = timing_repo.indexs.commit(f"Automatic commit: {socket.gethostname()} {TIMING_DATE}") - timing_repo.remotes.origin.push() + timing_repo.index.add([os.path.join(runner.logdir, runner.summary_file_root + '_' + runner.timestamp + '.sqlite')]) + +# ---- Commit the new data to the repo +if count > 0: + commit = timing_repo.indexs.commit(f"Automatic commit: {socket.gethostname()} {TIMING_DATE}") + timing_repo.remotes.origin.push()