diff --git a/.gitignore b/.gitignore index 89eb912..07d8d98 100644 --- a/.gitignore +++ b/.gitignore @@ -131,5 +131,6 @@ dmypy.json exp__* experiments/simple_raytune/benchmark__RaytuneBenchmark experiments/optuna_minikube/benchmark__OptunaMinikubeBenchmark +experiments/katib_minikube/benchmark__KatibBenchmark data/ \ No newline at end of file diff --git a/experiments/katib_minikube/experiment_template.yaml b/experiments/katib_minikube/experiment_template.yaml new file mode 100644 index 0000000..1e719a9 --- /dev/null +++ b/experiments/katib_minikube/experiment_template.yaml @@ -0,0 +1,52 @@ +--- +apiVersion: kubeflow.org/v1beta1 +kind: Experiment +metadata: + namespace: kubeflow + name: $study_name +spec: + objective: + type: maximize + goal: 0.8 + objectiveMetricName: precision + algorithm: + algorithmName: grid + parallelTrialCount: $worker_num + maxTrialCount: $jobs_num + maxFailedTrialCount: $worker_num + parameters: + - name: lr + parameterType: double + feasibleSpace: + min: "0.001" + max: "0.02" + step: "0.001" + trialTemplate: + primaryContainerName: training-container + trialParameters: + - name: learningRate + description: Learning rate for the training model + reference: lr + trialSpec: + apiVersion: batch/v1 + kind: Job + spec: + template: + spec: + containers: + - name: training-container + image: $worker_image + imagePullPolicy: IfNotPresent + command: + - "python3" + - "experiments/katib_minikube/$worker_image/task.py" + - "--batch-size=64" + - "--lr=$trialParameters" + env: + - name: STUDY_NAME + value: "$study_name" + - name: DB_CONN + value: "postgresql://postgresadmin:admin123@postgres:5432/postgresdb" + - name: "METRICS_STORAGE_HOST" + value: "$metrics_ip" + restartPolicy: Never diff --git a/experiments/katib_minikube/grid.yaml b/experiments/katib_minikube/grid.yaml new file mode 100644 index 0000000..04c4f67 --- /dev/null +++ b/experiments/katib_minikube/grid.yaml @@ -0,0 +1,52 @@ +--- +apiVersion: kubeflow.org/v1beta1 +kind: Experiment +metadata: + namespace: kubeflow + name: katib-study-32 +spec: + objective: + type: maximize + goal: 0.8 + objectiveMetricName: precision + algorithm: + algorithmName: grid + parallelTrialCount: 2 + maxTrialCount: 2 + maxFailedTrialCount: 2 + parameters: + - name: lr + parameterType: double + feasibleSpace: + min: "0.001" + max: "0.02" + step: "0.001" + trialTemplate: + primaryContainerName: training-container + trialParameters: + - name: learningRate + description: Learning rate for the training model + reference: lr + trialSpec: + apiVersion: batch/v1 + kind: Job + spec: + template: + spec: + containers: + - name: training-container + image: mnist_task + imagePullPolicy: IfNotPresent + command: + - "python3" + - "experiments/katib_minikube/mnist_task/task.py" + - "--batch-size=64" + - "--lr=${trialParameters.learningRate}" + env: + - name: STUDY_NAME + value: "katib-study-32" + - name: DB_CONN + value: "postgresql://postgresadmin:admin123@postgres:5432/postgresdb" + - name: "METRICS_STORAGE_HOST" + value: "134.101.4.161" + restartPolicy: Never diff --git a/experiments/katib_minikube/katib_benchmark.py b/experiments/katib_minikube/katib_benchmark.py new file mode 100644 index 0000000..d4016b3 --- /dev/null +++ b/experiments/katib_minikube/katib_benchmark.py @@ -0,0 +1,310 @@ +from os import path +import os +from shutil import ExecError +import sys +from time import sleep +from urllib.request import urlopen +from kubernetes.client.rest import ApiException +import random +from kubernetes import client, config, watch +from kubernetes.client.rest import ApiException +from string import Template +import yaml +import docker +import logging as log + +from ml_benchmark.benchmark_runner import Benchmark +from ml_benchmark.utils.image_build_wrapper import builder_from_string + + + + +class KatibBenchmark(Benchmark): + + def __init__(self, resources) -> None: + self.resources = resources + self.group="kubeflow.org" + self.version="v1beta1" + self.namespace="kubeflow" + self.plural="experiments" + self.experiment_file_name = "grid.yaml" + self.metrics_ip = resources.get("metricsIP") + self.generate_new_docker_image = resources.get("generateNewDockerImage",True) + self.clean_up = self.resources.get("cleanUp",True) + self.trial_tag = resources.get("dockerImageTag","mnist_task") + self.study_name= resources.get("studyName",f'katib-study-{random.randint(0, 100)}') + self.workerCpu = resources.get("workerCpu",2) + self.workerMemory= resources.get("workerMemory",2) + self.workerCount = resources.get("workerCount",5) + self.jobsCount = resources.get("jobsCount",6) + + self.logging_level= self.resources.get("loggingLevel",log.INFO) + log.basicConfig(format='%(asctime)s Katib Benchmark %(levelname)s: %(message)s',level=self.logging_level) + + + + + + def deploy(self): + """ + With the completion of this step the desired architecture of the HPO Framework should be running + on a platform, e.g,. in the case of Kubernetes it referes to the steps nassary to deploy all pods + and services in kubernetes. + """ + + + log.info("Deploying katib:") + res = os.popen('kubectl apply -k "github.com/kubeflow/katib.git/manifests/v1beta1/installs/katib-standalone?ref=master"').read() + log.info(res) + + + + config.load_kube_config() + w = watch.Watch() + c = client.CoreV1Api() + deployed = 0 + log.info("Waiting for all Katib pods to be ready:") + # From all pods that polyaxon starts we are onlly really intrested for following 4 that are crucial for runnig of the experiments + monitored_pods = ["katib-cert-generator","katib-db-manager","katib-mysql","katib-ui","katib-controller"] + for e in w.stream(c.list_namespaced_pod, namespace=self.namespace): + ob = e["object"] + + for name in monitored_pods: + + #checking if it is one of the pods that we want to monitor + if name in ob.metadata.name: + + # Checking if the pod already is runnig and its underlying containers are ready + if ob.status.phase == "Running" and ob.status.container_statuses[0].ready: + log.info(f'{ob.metadata.name} is ready') + monitored_pods.remove(name) + deployed = deployed + 1 + + # Checking for status of cert generator + elif name == "katib-cert-generator" and ob.status.phase == "Succeeded": + log.info(f'{ob.metadata.name} is Succeeded') + monitored_pods.remove(name) + + + #if all monitored pods are running the deployment process was ended + if not monitored_pods: + + w.stop() + log.info("Finished deploying crucial pods") + + + + + + + + + def setup(self): + """ + Every Operation that is needed before the actual optimization (trial) starts and that is not relevant + for starting up workers or the necessary architecture. + """ + + #creating experiment yaml + + experiment_definition = { + "worker_num": self.workerCount, + "jobs_num":self.jobsCount, + "worker_cpu": self.workerCpu, + "worker_mem": f"{self.workerMemory}Gi", + "worker_image": self.trial_tag, + "study_name": self.study_name, + "trialParameters":"${trialParameters.learningRate}", + "metrics_ip":self.metrics_ip + } + + #loading and fulling the template + with open(path.join(path.dirname(__file__), "experiment_template.yaml"), "r") as f: + job_template = Template(f.read()) + job_yml_objects = job_template.substitute(experiment_definition) + + #writing the experiment definition into the file + with open(path.join(path.dirname(__file__), self.experiment_file_name), "w") as f: + f.write(job_yml_objects) + log.info("Experiment yaml created") + + + #only generating the docker image if specified so. + if self.generate_new_docker_image: + log.info("Creating task docker image") + #creating docker image inside of the minikube + self.image_builder = builder_from_string("minikube")() + PROJECT_ROOT = os.path.abspath(os.path.join(__file__ ,"../../../")) + res = self.image_builder.deploy_image( + f'experiments/katib_minikube/{self.trial_tag}/Dockerfile', self.trial_tag,PROJECT_ROOT) + log.info(res) + log.info(f"Image: {self.trial_tag}") + + + + def run(self): + + log.info("Starting Katib experiment:") + api_instance= client.CustomObjectsApi() + + #Loading experiment definition + with open(path.join(path.dirname(__file__), self.experiment_file_name), "r") as f: + body = yaml.safe_load(f) + + #Starting experiment by creating experiment crd with help of kubernetes API + try: + api_response = api_instance.create_namespaced_custom_object( + group=self.group, + version=self.version, + namespace=self.namespace, + body=body, + plural=self.plural, + ) + log.info("Succses: Experiment started") + log.info(api_response) + except ApiException as e: + log.info("Exception when calling CustomObjectsApi->create_cluster_custom_object: %s\n" % e) + + + #Blocking untill the run is finished + #The GET /apis/{group}/{version}/namespaces/{namespace}/{plural}/{name} endpoint doesnt support watch argument. + + #TODO changing to watching of the jobs instead of the experiment crd? + experiment = self.get_experiment() + while "status" not in experiment: + log.info("Waitinng for the status") + sleep(5) + experiment = self.get_experiment() + # log.info(experiment) + + + while experiment["status"]["conditions"][-1]["type"]!="Succeeded": + experiment = self.get_experiment() + succeeded = experiment["status"].get("trialsSucceeded",0) + + log.info(f'Status: {experiment["status"]["conditions"][-1]["reason"]} {succeeded} trials of {self.jobsCount} succeeded') + log.debug(experiment["status"]) + + sleep(2) + + + def collect_benchmark_metrics(self): + return super().collect_benchmark_metrics() + + + + def get_experiment(self): + config.load_kube_config() + api = client.CustomObjectsApi() + #requesting experiments crd that contains data about finisched experiment + try: + resource = api.get_namespaced_custom_object_status( + group=self.group, + version=self.version, + namespace=self.namespace, + name=self.study_name, + plural=self.plural, + ) + + return resource + except ApiException as e: + log.info("Exception when calling CustomObjectsApi->get_namespaced_custom_object_status: %s\n" % e) + return "" + + + def collect_run_results(self): + + log.info("Collecting run results:") + + experiment = self.get_experiment() + + log.debug("\n Experiment finished with following optimal trial:") + log.debug(experiment["status"]["currentOptimalTrial"]) + return experiment["status"]["currentOptimalTrial"] + + + def test(self): + return super().test() + + def undeploy(self): + + log.info("Deleteing the experiment crd from the cluster") + config.load_kube_config() + api = client.CustomObjectsApi() + try: + #deleting all experiment crds + resource = api.delete_collection_namespaced_custom_object( + group=self.group, + version=self.version, + namespace=self.namespace, + plural=self.plural, + ) + log.info(resource) + + except ApiException as e: + log.info("Exception when calling CustomObjectsApi->get_namespaced_custom_object_status: %s\n" % e) + + + + w = watch.Watch() + c = client.CoreV1Api() + log.info("Deleteing the namespace:") + res = c.delete_namespace_with_http_info(name=self.namespace) + + + log.info("Checking status of the namespace:") + #if the namespace was still existent we must wait till it is really terminated + for e in w.stream(c.list_namespace): + ob = e["object"] + # if the status of our namespace was changed we check if it the namespace was really removed from the cluster by requesting and expecting it to be not found + if ob.metadata.name == self.namespace: + try: + log.debug(c.read_namespace_status_with_http_info(name=self.namespace)) + except ApiException as err: + + if self.clean_up: + log.info("Deleteing task docker image from minikube") + sleep(2) + self.image_builder.cleanup(self.trial_tag) + + + if(err.status != 404): + raise Exception("Something went wrong",err) + else: + log.info("Namespace sucessfully deleted") + w.stop() + + + log.info("Finished undeploying") + + + + + +if __name__ == "__main__": + #main() + + resources={ + # "dockerUserLogin":"", + # "dockerUserPassword":"", + # "studyName":"" + "jobsCount":2, + # "dockerImageTag":"light_task", + "workerCount":2, + "metricsIP": urlopen("https://checkip.amazonaws.com").read().decode("utf-8").strip(), + "generateNewDockerImage": True, + "cleanUp": True , + } + # bench = KatibBenchmark(resources=resources) + # bench.deploy() + # bench.setup() + # bench.run() + # bench.collect_run_results() + # bench.undeploy() + + + + from ml_benchmark.benchmark_runner import BenchmarkRunner + runner = BenchmarkRunner( + benchmark_cls=KatibBenchmark, resources=resources) + runner.run() \ No newline at end of file diff --git a/experiments/katib_minikube/light_task/Dockerfile b/experiments/katib_minikube/light_task/Dockerfile new file mode 100644 index 0000000..8a05a7d --- /dev/null +++ b/experiments/katib_minikube/light_task/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.9-slim + +RUN pip install pip --upgrade + +COPY experiments experiments +# COPY data data +COPY setup.py setup.py +COPY ml_benchmark ml_benchmark + + + +RUN pip install -e . + +CMD ["python", "experiments/katib_minikube/light_task/task.py"] \ No newline at end of file diff --git a/experiments/katib_minikube/light_task/README.md b/experiments/katib_minikube/light_task/README.md new file mode 100644 index 0000000..3326bc7 --- /dev/null +++ b/experiments/katib_minikube/light_task/README.md @@ -0,0 +1,11 @@ +# PyTorch MNIST Image Classification Example + +This is PyTorch MNIST image classification training container with saving metrics +to the file or printing to the StdOut. It uses convolutional neural network to +train the model. + +Katib uses this training container in some Experiments, for instance in the +[file Metrics Collector example](../../metrics-collector/file-metrics-collector.yaml#L55-L64), +the [file Metrics Collector with logs in JSON format example](../../metrics-collector/file-metrics-collector-with-json-format.yaml#L52-L62), +the [median stopping early stopping rule with logs in JSON format example](../../early-stopping/median-stop-with-json-format.yaml#L62-L71) +and the [PyTorchJob example](../../kubeflow-training-operator/pytorchjob-mnist.yaml#L47-L54). diff --git a/experiments/katib_minikube/light_task/requirements.txt b/experiments/katib_minikube/light_task/requirements.txt new file mode 100644 index 0000000..9852601 --- /dev/null +++ b/experiments/katib_minikube/light_task/requirements.txt @@ -0,0 +1,2 @@ +Pillow>=9.1.1 +numpy \ No newline at end of file diff --git a/experiments/katib_minikube/light_task/task.py b/experiments/katib_minikube/light_task/task.py new file mode 100644 index 0000000..1101b71 --- /dev/null +++ b/experiments/katib_minikube/light_task/task.py @@ -0,0 +1,62 @@ +from __future__ import print_function + +import argparse +import imp +import logging +import os +import numpy as np +import time +from ml_benchmark.latency_tracker import latency_decorator + +@latency_decorator +def train(times ,epochs): + loss = 1 + for epoch in range(epochs): + for x in np.arange(1,times + 1): + loss = 1 - (x -1 ) / times + time.sleep(0.01) + msg = "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format( + epoch, x , times, + 100. * x / times, loss) + logging.info(msg) + + +def test(): + + test_accuracy =0.7 + logging.info("precision={:.4f}\n".format( + test_accuracy)) + + + +def main(): + + #parsing arguments + parser = argparse.ArgumentParser(description="MNIST Example") + parser.add_argument("--batch-size", type=int, default=64, metavar="N", + help="input batch size for training (default: 64)") + parser.add_argument("--epochs", type=int, default=10, metavar="N", + help="number of epochs to train (default: 10)") + parser.add_argument("--lr", type=float, default=0.01, metavar="LR", + help="learning rate (default: 0.01)") + args = parser.parse_args() + + #timestamps format + logging.basicConfig( + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%SZ", + level=logging.DEBUG) + + + #model taining and testing + epochs = args.epochs + batch_size = args.batch_size + lr = args.lr + + train(batch_size,epochs) + test() + + + +if __name__ == "__main__": + main() diff --git a/experiments/katib_minikube/mnist_task/Dockerfile b/experiments/katib_minikube/mnist_task/Dockerfile new file mode 100644 index 0000000..0415797 --- /dev/null +++ b/experiments/katib_minikube/mnist_task/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.9.6-slim + +RUN pip install pip --upgrade +RUN pip install torch==1.10.2 torchvision==0.11.3 +RUN pip install optuna==2.10.1 + +COPY experiments/katib_minikube experiments/katib_minikube +COPY setup.py setup.py +COPY ml_benchmark ml_benchmark + + + +RUN pip install -e . + +CMD ["python", "experiments/katib_minikube/mnist_task/task.py"] diff --git a/experiments/katib_minikube/mnist_task/README.md b/experiments/katib_minikube/mnist_task/README.md new file mode 100644 index 0000000..3326bc7 --- /dev/null +++ b/experiments/katib_minikube/mnist_task/README.md @@ -0,0 +1,11 @@ +# PyTorch MNIST Image Classification Example + +This is PyTorch MNIST image classification training container with saving metrics +to the file or printing to the StdOut. It uses convolutional neural network to +train the model. + +Katib uses this training container in some Experiments, for instance in the +[file Metrics Collector example](../../metrics-collector/file-metrics-collector.yaml#L55-L64), +the [file Metrics Collector with logs in JSON format example](../../metrics-collector/file-metrics-collector-with-json-format.yaml#L52-L62), +the [median stopping early stopping rule with logs in JSON format example](../../early-stopping/median-stop-with-json-format.yaml#L62-L71) +and the [PyTorchJob example](../../kubeflow-training-operator/pytorchjob-mnist.yaml#L47-L54). diff --git a/experiments/katib_minikube/mnist_task/requirements.txt b/experiments/katib_minikube/mnist_task/requirements.txt new file mode 100644 index 0000000..9852601 --- /dev/null +++ b/experiments/katib_minikube/mnist_task/requirements.txt @@ -0,0 +1,2 @@ +Pillow>=9.1.1 +numpy \ No newline at end of file diff --git a/experiments/katib_minikube/mnist_task/task.py b/experiments/katib_minikube/mnist_task/task.py new file mode 100644 index 0000000..2d3af82 --- /dev/null +++ b/experiments/katib_minikube/mnist_task/task.py @@ -0,0 +1,78 @@ +import argparse +import logging +import numpy as np +import time +import os +import sys +PROJECT_ROOT = os.path.abspath(os.path.join(__file__ ,"../../../../../")) +sys.path.append(PROJECT_ROOT) +from ml_benchmark.workload.mnist.mnist_task import MnistTask + + +def train(times ,epoch): + loss = 1 + for x in np.arange(1,times + 1): + loss = 1 - (x -1 ) / times + time.sleep(0.1) + msg = "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format( + epoch, x , times, + 100. * x / times, loss) + logging.info(msg) + + +def test(): + + test_accuracy =0.8 + logging.info("Validation-accuracy={:.4f}\n".format( + test_accuracy)) + + + +def main(): + + + #parsing arguments + parser = argparse.ArgumentParser(description="MNIST Example") + parser.add_argument("--batch-size", type=int, default=64, metavar="N", + help="input batch size for training (default: 64)") + parser.add_argument("--epochs", type=int, default=10, metavar="N", + help="number of epochs to train (default: 10)") + parser.add_argument("--lr", type=float, default=0.01, metavar="LR", + help="learning rate (default: 0.01)") + args = parser.parse_args() + epochs = args.epochs + batch_size = args.batch_size + lr = args.lr + + + + + + + #MnistTask + task = MnistTask(config_init={"epochs": epochs}) + objective = task.create_objective() + #TODO add the weight decay to the definition of the template + objective.set_hyperparameters({"learning_rate": lr, "weight_decay": 0.01}) + objective.train() + validation_scores = objective.validate() + + + #Geting results + avg = validation_scores["weighted avg"] + print(f'precision={avg["precision"]}') + # print(f'f1-score={avg["f1-score"]}') + + # logging.basicConfig( + # format="%(asctime)s %(levelname)-8s %(message)s", + # datefmt="%Y-%m-%dT%H:%M:%SZ", + # level=logging.DEBUG) + + + # logging.info("precision={:.4f}\n".format( + # avg["precision"])) + + # logging.info("f1-score={:.4f}\n".format( + # avg["f1-score"])) +if __name__ == "__main__": + main() diff --git a/experiments/katib_minikube/requirements.txt b/experiments/katib_minikube/requirements.txt new file mode 100644 index 0000000..8ef8413 --- /dev/null +++ b/experiments/katib_minikube/requirements.txt @@ -0,0 +1,2 @@ +kubernetes==24.2.0 +torch \ No newline at end of file diff --git a/ml_benchmark/__init__.py b/ml_benchmark/__init__.py index f628f98..2742851 100644 --- a/ml_benchmark/__init__.py +++ b/ml_benchmark/__init__.py @@ -1,7 +1,7 @@ __version__ = "develop" install_requires = [ "scikit-learn==0.24.2", - "tqdm==4.62.3", "SQLAlchemy==1.4.31", "docker==4.4.2", + "tqdm==4.62.3", "SQLAlchemy==1.4.31", "docker==4..2", "psycopg2-binary"], test_install_requires = ["pytest==7.1.2", "pytest-cov==3.0.0"] URL = "https://github.com/gebauerm/ml_benchmark" diff --git a/ml_benchmark/latency_tracker.py b/ml_benchmark/latency_tracker.py index 0f98094..f5929f8 100644 --- a/ml_benchmark/latency_tracker.py +++ b/ml_benchmark/latency_tracker.py @@ -61,8 +61,9 @@ def track(self, latency_obj): stmt = insert(latency).values(latency_obj.to_dict()) conn.execute(stmt) print(f"Latency Recorded! Latency: {latency_obj.to_dict()}") - except Exception: - print(f"Failed to record latency: {latency_obj.to_dict()}") + except Exception as e: + print(f"Failed to record latency: {latency_obj.to_dict()}",e) + def _get_connection_string(self): # XXX: list order is implicitly a priority