From 3b2c2788b2414b20f7f53badbe0f1149baee024d Mon Sep 17 00:00:00 2001 From: mak-454 Date: Thu, 21 Sep 2023 11:16:20 +0000 Subject: [PATCH 1/3] sample to show building a complex pipeline. --- Dockerfile | 57 ++++++++- conda_env.yaml | 15 ++- kubeflow/pipeline/workflow.py | 225 ++++++++++++++++++++++++++++++---- 3 files changed, 263 insertions(+), 34 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6a8487b..17dd4b9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,58 @@ -FROM ocdr/d3cicd_base:0.1.0 +FROM ubuntu:18.04 + +RUN apt-get update -y && DEBIAN_FRONTEND=noninteractive apt-get -y install git python-pip curl + +RUN apt-get install -y python3.7 python3-distutils libpython3.7 + + +#conda +ENV PATH="/root/miniconda3/bin:${PATH}" +ARG PATH="/root/miniconda3/bin:${PATH}" +RUN apt-get update + +RUN apt-get install -y wget && rm -rf /var/lib/apt/lists/* + +RUN wget \ + https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh \ + && mkdir /root/.conda \ + && bash Miniconda3-latest-Linux-x86_64.sh -b \ + && rm -f Miniconda3-latest-Linux-x86_64.sh ADD conda_env.yaml . RUN conda env create -f conda_env.yaml && \ - conda clean -afy && conda init bash && \ - echo "source activate dkube-env" > ~/.bashrc + conda clean -afy && conda init bash && \ + echo "conda activate dkube-env" >> ~/.bashrc + +RUN ln -s /usr/lib/x86_64-linux-musl/libc.so /lib/libc.musl-x86_64.so.1 + +#dcc +#RUN source ~/.bashrc && curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py && python3.7 get-pip.py && \ +# rm get-pip.py && \ +# pip install dkube-cicd-controller==1.6.0 setuptools==66.1.1 && \ +# ln -s /usr/lib/x86_64-linux-musl/libc.so /lib/libc.musl-x86_64.so.1 + +#RUN conda activate dkube-env && pip install dkube-cicd-controller==1.6.0 && ln -s /usr/lib/x86_64-linux-musl/libc.so /lib/libc.musl-x86_64.so.1 + +#kubectl +RUN curl -LO https://dl.k8s.io/release/v1.20.0/bin/linux/amd64/kubectl +RUN mv kubectl /usr/local/bin/kubectl && chmod +x /usr/local/bin/kubectl + +#RUN sed -i '1c\#!\/root\/miniconda3\/envs\/dkube-env\/bin\/python3.7' /usr/local/bin/dsl-compile +#RUN sed -i '1c\#!\/root\/miniconda3\/envs\/dkube-env\/bin\/python3.7' /usr/local/bin/dcc + +#RUN echo "import re \ +#import sys \ +#from kfp.compiler.main import main \ +#if __name__ == '__main__': \ +# sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) \ +# sys.exit(main())" > /usr/local/bin/dsl-compile + RUN touch /built_using_dockerfile -ENV PATH=/opt/conda/envs/dkube-env/bin:$PATH \ No newline at end of file +ENV PATH=/opt/conda/envs/dkube-env/bin:$PATH + +ENV TINI_VERSION v0.16.1 +ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini /usr/bin/tini +RUN chmod +x /usr/bin/tini + +ENTRYPOINT [ "/usr/bin/tini", "--" ] diff --git a/conda_env.yaml b/conda_env.yaml index 1990836..0535849 100644 --- a/conda_env.yaml +++ b/conda_env.yaml @@ -4,8 +4,17 @@ channels: - defaults dependencies: - - python=3.8 + - python=3.7 - pip - pip: - - scikit-learn - - kfserving + - dkube-cicd-controller==1.6.0 + - git+https://github.com/oneconvergence/dkube.git@3.8 + - matplotlib + - logger + - scipy + - mlflow + - argparse + - kfp-server-api==1.2.0 + - kfp==1.6.3 +variables: + DKUBE_USER_ACCESS_TOKEN: "eyJhbGciOiJSUzI1NiIsImtpZCI6Ijc0YmNkZjBmZWJmNDRiOGRhZGQxZWIyOGM2MjhkYWYxIn0.eyJ1c2VybmFtZSI6Im9jZGt1YmUiLCJyb2xlIjoiZGF0YXNjaWVudGlzdCxtbGUscGUsb3BlcmF0b3IiLCJkZXBsb3kiOmZhbHNlLCJleHAiOjQ5MzAwOTc0ODUsImlhdCI6MTY5MDA5NzQ4NSwiaXNzIjoiRGt1YmUifQ.3oVnUtFZaBZrXnZr2a5SnftJvXYjn3ZtbGnoNy31CBXgDS7CFZ8ldxvqLlkd8xCXAO5S6Fz2LEYhp3D1zEybuTqwVFVqwRTXe8CnuN1iz_UT4Gj6XZuNqZ29_XK0kjc_X86jx0-fLUVhk6sWpulFR_q6lyjVm63zi3oAD74sVtl-Cvi4MHYrBXNV_eb3dliwsyBneM1VyOXsPX13XvRogdDkQAf22p_hDO3TjVEo7wmx_b4VoEIvY-Vl3Qr0An7drXHT5sBejFjRXxB_0vuAii1AJh66QoLVgXal6g9dMvmpCXZ9K4g09QbUg1c1_VP-VO-Rh189BzIDyp34dMcrGQ" diff --git a/kubeflow/pipeline/workflow.py b/kubeflow/pipeline/workflow.py index 6f382f3..fe85383 100644 --- a/kubeflow/pipeline/workflow.py +++ b/kubeflow/pipeline/workflow.py @@ -12,37 +12,208 @@ # See the License for the specific language governing permissions and # limitations under the License. """Pipeline workflow definition.""" - -import kfp import kfp.dsl as dsl +from kfp import components +from kfp.components._yaml_utils import load_yaml +from kfp.components._yaml_utils import dump_yaml +from kubernetes import client as k8s_client +import json +import os +from dkube.sdk import * -# Initialize component store -component_store = kfp.components.ComponentStore( - local_search_paths=["kubeflow/components"] -) +setup_component = ''' +name: create_dkube_resource +description: | + creates dkube resources required for pipeline. +metadata: + annotations: {platform: 'Dkube'} + labels: {stage: 'create_dkube_resource', logger: 'dkubepl', wfid: '{{workflow.uid}}', runid: '{{pod.name}}'} +inputs: + - {name: token, type: String, optional: false, + description: 'Required. Dkube authentication token.'} + - {name: user, type: String, optional: false, + description: 'Required. Dkube Logged in User name.'} + - {name: project_id, type: String, optional: false, + description: 'Required. Dkube Project name.'} +implementation: + container: + image: docker.io/ocdr/dkube-examples-setup:cli-reg-2 + command: ['python3', 'regressionsetup.py'] + args: [ + --auth_token, {inputValue: token}, + --user, {inputValue: user}, + --project_id, {inputValue: project_id} + ] +''' + +# Get the project ID passed from JupyterLab creation +project_id = os.environ.get("DKUBE_PROJECT_ID", "") +print("project id") +# Check to see if this is blank, and if so use "clinical-reg" +if not project_id: + print("inside project id") + project_name = "clinical-reg" + DKUBE_URL = "https://dkube-proxy.dkube:443" + DKUBE_TOKEN = os.getenv("DKUBE_USER_ACCESS_TOKEN", "") + print(DKUBE_TOKEN) + api = DkubeApi(URL=DKUBE_URL,token=DKUBE_TOKEN) + + # If "clinical-reg" already exists, just get the project ID + try: + project = DkubeProject(project_name) + res = api.create_project(project) + except Exception as e: + if e.reason.lower()=="conflict": + print(f"Project \"{project_name}\" already exists") + project_id = api.get_project_id(project_name) + print(project_id) -# Create component factories -add_op = component_store.load_component("my_add") -divide_op = component_store.load_component("my_divide") +def _component(stage, name): + with open('kubeflow/components/{}/component.yaml'.format(stage), 'rb') as stream: + cdict = load_yaml(stream) + cdict['name'] = name + cyaml = dump_yaml(cdict) + return components.load_component_from_text(cyaml) + +setup_op = components.load_component(text = setup_component) -# Define pipeline @dsl.pipeline( - name="A Simple CI pipeline", - description="Basic sample to show how to do CI with KFP using CloudBuild", + name='dkube-regression-pl', + description='sample regression pipeline with dkube components' ) -def pipeline( - x_value: int = 1, y_value: int = 1, z_value: int = 1, -): - add_step = add_op(x_value=x_value, y_value=y_value) - add_step.set_display_name("Add x and y") - add_result = add_step.outputs - sum_value = add_result["sum"] - with kfp.dsl.Condition(sum_value != 0): - divide_step = divide_op(x_value=sum_value, y_value=z_value) - divide_step.set_display_name("Divide sum by z") - add_step2 = add_op( - x_value=divide_step.outputs["quotient"], - y_value=divide_step.outputs["remainder"], - ) - add_step2.set_display_name("Add quotient and remainder") +def d3pipeline( + user, + auth_token, + tags, + project_id=project_id, + #Clinical preprocess + clinical_preprocess_script="python clinical_reg/cli-pre-processing.py", + clinical_preprocess_datasets=json.dumps(["clinical"]), + clinical_preprocess_input_mounts=json.dumps(["/opt/dkube/input"]), + clinical_preprocess_outputs=json.dumps(["clinical-preprocessed"]), + clinical_preprocess_output_mounts=json.dumps(["/opt/dkube/output"]), + + #Image preprocess + image_preprocess_script="python clinical_reg/img-pre-processing.py", + image_preprocess_datasets=json.dumps(["images"]), + image_preprocess_input_mounts=json.dumps(["/opt/dkube/input"]), + image_preprocess_outputs=json.dumps(["images-preprocessed"]), + image_preprocess_output_mounts=json.dumps(["/opt/dkube/output"]), + + #Clinical split + clinical_split_script="python clinical_reg/split.py --datatype clinical", + clinical_split_datasets=json.dumps(["clinical-preprocessed"]), + clinical_split_input_mounts=json.dumps(["/opt/dkube/input"]), + clinical_split_outputs=json.dumps(["clinical-train", "clinical-test", "clinical-val"]), + clinical_split_output_mounts=json.dumps(["/opt/dkube/outputs/train", "/opt/dkube/outputs/test", "/opt/dkube/outputs/val"]), + + #Image split + image_split_script="python clinical_reg/split.py --datatype image", + image_split_datasets=json.dumps(["images-preprocessed"]), + image_split_input_mounts=json.dumps(["/opt/dkube/input"]), + image_split_outputs=json.dumps(["images-train", "images-test", "images-val"]), + image_split_output_mounts=json.dumps(["/opt/dkube/outputs/train", "/opt/dkube/outputs/test", "/opt/dkube/outputs/val"]) , + + #RNA split + rna_split_script="python clinical_reg/split.py --datatype rna", + rna_split_datasets=json.dumps(["rna"]), + rna_split_input_mounts=json.dumps(["/opt/dkube/input"]), + rna_split_outputs=json.dumps(["rna-train", "rna-test", "rna-val"]), + rna_split_output_mounts=json.dumps(["/opt/dkube/outputs/train", "/opt/dkube/outputs/test", "/opt/dkube/outputs/val"]), + + #Training + job_group = 'default', + #Framework. One of tensorflow, pytorch, sklearn + framework = "tensorflow", + #Framework version + version = "2.3.0", + #In notebook DKUBE_USER_ACCESS_TOKEN is automatically picked up from env variable + #Or any other custom image name can be supplied. + #For custom private images, please input username/password + training_container=json.dumps({'image':'ocdr/dkube-datascience-tf-cpu:v2.3.0-17'}), + #Name of the workspace in dkube. Update accordingly if different name is used while creating a workspace in dkube. + training_program="regression", + #Script to run inside the training container + training_script="python clinical_reg/train_nn.py --epochs 5", + #Input datasets for training. Update accordingly if different name is used while creating dataset in dkube. + training_datasets=json.dumps(["clinical-train", "clinical-val", "images-train", + "images-val", "rna-train", "rna-val"]), + training_input_dataset_mounts=json.dumps(["/opt/dkube/inputs/train/clinical", "/opt/dkube/inputs/val/clinical", + "/opt/dkube/inputs/train/images", "/opt/dkube/inputs/val/images", + "/opt/dkube/inputs/train/rna", "/opt/dkube/inputs/val/rna"]), + training_outputs=json.dumps(["regression-model"]), + training_output_mounts=json.dumps(["/opt/dkube/output"]), + #Request gpus as needed. Val 0 means no gpu, then training_container=docker.io/ocdr/dkube-datascience-tf-cpu:v1.12 + training_gpus=0, + #Any envs to be passed to the training program + training_envs=json.dumps([{"steps": 100}]), + + tuning=json.dumps({}), + + #Evaluation + evaluation_script="python clinical_reg/evaluate.py", + evaluation_datasets=json.dumps(["clinical-test", "images-test", "rna-test"]), + evaluation_input_dataset_mounts=json.dumps(["/opt/dkube/inputs/test/clinical", "/opt/dkube/inputs/test/images", + "/opt/dkube/inputs/test/rna"]), + evaluation_models=json.dumps(["regression-model"]), + evaluation_input_model_mounts=json.dumps(["/opt/dkube/inputs/model"]), + + #Serving + #Device to be used for serving - dkube mnist example trained on gpu needs gpu for serving else set this param to 'cpu' + deployment_name='regression-pl', + serving_device='cpu', + #Serving image + serving_image=json.dumps({'image':'ocdr/tensorflowserver:2.3.0'}), + #Transformer image + transformer_image=json.dumps({'image':'ocdr/dkube-datascience-tf-cpu:v2.3.0-17'}), + #Script to execute the transformer + transformer_code="clinical_reg/transformer.py"): + + create_resource = setup_op(user = user, token = auth_token, project_id = project_id) + + create_resource.execution_options.caching_strategy.max_cache_staleness = "P0D" + + clinical_preprocess = _component('preprocess', 'clinical-preprocess')(container=training_container, + tags=tags, program=training_program, run_script=clinical_preprocess_script, + datasets=clinical_preprocess_datasets, outputs=clinical_preprocess_outputs, + input_dataset_mounts=clinical_preprocess_input_mounts, output_mounts=clinical_preprocess_output_mounts).after(create_resource) + image_preprocess = _component('preprocess', 'images-preprocess')(container=training_container, + tags=tags, program=training_program, run_script=image_preprocess_script, + datasets=image_preprocess_datasets, outputs=image_preprocess_outputs, + input_dataset_mounts=image_preprocess_input_mounts, output_mounts=image_preprocess_output_mounts).after(create_resource) + + clinical_split = _component('preprocess', 'clinical-split')(container=training_container, + tags=tags, program=training_program, run_script=clinical_split_script, + datasets=clinical_split_datasets, outputs=clinical_split_outputs, + input_dataset_mounts=clinical_split_input_mounts, + output_mounts=clinical_split_output_mounts).after(clinical_preprocess) + + image_split = _component('preprocess', 'images-split')(container=training_container, + tags=tags, program=training_program, run_script=image_split_script, + datasets=image_split_datasets, outputs=image_split_outputs, + input_dataset_mounts=image_split_input_mounts, + output_mounts=image_split_output_mounts).after(image_preprocess) + + rna_split = _component('preprocess', 'rna-split')(container=training_container, + tags=tags, program=training_program, run_script=rna_split_script, + datasets=rna_split_datasets, outputs=rna_split_outputs, + input_dataset_mounts=rna_split_input_mounts, output_mounts=rna_split_output_mounts).after(create_resource) + + train = _component('training', 'regression-model-training')(container=training_container, + tags=tags, program=training_program, run_script=training_script, + datasets=training_datasets, outputs=training_outputs, + input_dataset_mounts=training_input_dataset_mounts, + output_mounts=training_output_mounts, + ngpus=training_gpus, + envs=training_envs, + tuning=tuning, job_group=job_group, + framework=framework, version=version).after(clinical_split).after(image_split).after(rna_split) + serving = _component('serving', 'model-serving')(model=train.outputs['artifact'], device=serving_device, + name=deployment_name, + serving_image=serving_image, + transformer_image=transformer_image, + transformer_project=training_program, + transformer_code=transformer_code).after(train) + inference = _component('viewer', 'model-inference')(servingurl=serving.outputs['servingurl'], + servingexample='regression', viewtype='inference').after(serving) From 26fd3840b416cd7062a0825040f99ad8fbd1c171 Mon Sep 17 00:00:00 2001 From: mak-454 Date: Thu, 21 Sep 2023 11:25:10 +0000 Subject: [PATCH 2/3] imp fixes in dockerfile. --- .dkube-ci.yml | 2 +- Dockerfile | 20 ++------------------ 2 files changed, 3 insertions(+), 19 deletions(-) diff --git a/.dkube-ci.yml b/.dkube-ci.yml index c4a07f6..b35c03e 100644 --- a/.dkube-ci.yml +++ b/.dkube-ci.yml @@ -8,4 +8,4 @@ pipelines: path: kubeflow/pipeline pipelines: - name: workflow.py - build_image: ocdr/d3cicd:1.5.9 + build_image: ocdr/d3cicd:regression diff --git a/Dockerfile b/Dockerfile index 17dd4b9..58bd793 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,30 +23,14 @@ RUN conda env create -f conda_env.yaml && \ conda clean -afy && conda init bash && \ echo "conda activate dkube-env" >> ~/.bashrc +RUN cp /root/miniconda3/envs/dkube-env/bin/dcc /usr/local/bin/dcc +RUN cp /root/miniconda3/envs/dkube-env/bin/dsl-compile /usr/local/bin/dsl-compile RUN ln -s /usr/lib/x86_64-linux-musl/libc.so /lib/libc.musl-x86_64.so.1 -#dcc -#RUN source ~/.bashrc && curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py && python3.7 get-pip.py && \ -# rm get-pip.py && \ -# pip install dkube-cicd-controller==1.6.0 setuptools==66.1.1 && \ -# ln -s /usr/lib/x86_64-linux-musl/libc.so /lib/libc.musl-x86_64.so.1 - -#RUN conda activate dkube-env && pip install dkube-cicd-controller==1.6.0 && ln -s /usr/lib/x86_64-linux-musl/libc.so /lib/libc.musl-x86_64.so.1 - #kubectl RUN curl -LO https://dl.k8s.io/release/v1.20.0/bin/linux/amd64/kubectl RUN mv kubectl /usr/local/bin/kubectl && chmod +x /usr/local/bin/kubectl -#RUN sed -i '1c\#!\/root\/miniconda3\/envs\/dkube-env\/bin\/python3.7' /usr/local/bin/dsl-compile -#RUN sed -i '1c\#!\/root\/miniconda3\/envs\/dkube-env\/bin\/python3.7' /usr/local/bin/dcc - -#RUN echo "import re \ -#import sys \ -#from kfp.compiler.main import main \ -#if __name__ == '__main__': \ -# sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0]) \ -# sys.exit(main())" > /usr/local/bin/dsl-compile - RUN touch /built_using_dockerfile ENV PATH=/opt/conda/envs/dkube-env/bin:$PATH From e6e2f36a631fa503f785e6c393cc479b500b086c Mon Sep 17 00:00:00 2001 From: mak-454 Date: Tue, 26 Sep 2023 17:47:07 +0530 Subject: [PATCH 3/3] added the missing components. --- kubeflow/components/job/component.yaml | 81 +++++++ kubeflow/components/preprocess/component.yaml | 101 ++++++++ kubeflow/components/serving/component.yaml | 87 +++++++ kubeflow/components/slurm/preprocessing.yaml | 228 ++++++++++++++++++ kubeflow/components/slurm/training.yaml | 228 ++++++++++++++++++ kubeflow/components/storage/component.yaml | 41 ++++ kubeflow/components/submit/component.yaml | 31 +++ kubeflow/components/training/component.yaml | 132 ++++++++++ kubeflow/components/viewer/component.yaml | 39 +++ 9 files changed, 968 insertions(+) create mode 100644 kubeflow/components/job/component.yaml create mode 100644 kubeflow/components/preprocess/component.yaml create mode 100644 kubeflow/components/serving/component.yaml create mode 100644 kubeflow/components/slurm/preprocessing.yaml create mode 100644 kubeflow/components/slurm/training.yaml create mode 100644 kubeflow/components/storage/component.yaml create mode 100644 kubeflow/components/submit/component.yaml create mode 100644 kubeflow/components/training/component.yaml create mode 100644 kubeflow/components/viewer/component.yaml diff --git a/kubeflow/components/job/component.yaml b/kubeflow/components/job/component.yaml new file mode 100644 index 0000000..d7e88a1 --- /dev/null +++ b/kubeflow/components/job/component.yaml @@ -0,0 +1,81 @@ +name: dkube-job +description: | + Component which can be used to run a user job with DKube resources. +metadata: + annotations: {platform: 'Dkube'} + labels: {platform: 'Dkube', logger: 'dkubepl', wfid: '{{workflow.uid}}', runid: '{{pod.name}}', stage: 'preprocess'} +inputs: + - {name: job_prefix, type: String, optional: false, + description: 'Required. prefix to add to job name.'} + - {name: auth_token, type: String, optional: false, + description: 'Required. Dkube authentication token.'} + - {name: container, type: Dict, optional: false, + description: 'Required. Container to use for preprocessing. Format: {"image":, "username":<>, "password":<>}'} + - {name: program, type: String, default: '', + description: 'Optional. Program imported in Dkube to be run inside container. If not specified container should have entrypoint.'} + - {name: commit_id, type: String, default: '', + description: 'Optional. Program commit ID. If not provided, dkube takes the latest commit.'} + - {name: run_script, type: String, default: '', + description: 'Optional. Script to run the program. If not specified container should have entrypoint.'} + - {name: datasets, type: List, default: '[]', + description: 'Optional. List of input datasets required for preprocessing. These datasets must be created in Dkube.'} + - {name: input_dataset_mounts, type: List, default: '[]', + description: 'Optional. List of input datasets mount paths.'} + - {name: input_dataset_versions, type: List, default: '[]', + description: 'Optional. List of input datasets versions.'} + - {name: featuresets, type: List, default: '[]', + description: 'Optional. List of input featuresets required for preprocessing. These featuresets must be created in Dkube.'} + - {name: input_featureset_mounts, type: List, default: '[]', + description: 'Optional. List of input featuresets mount paths.'} + - {name: input_featureset_versions, type: List, default: '[]', + description: 'Optional. List of input featureset versions.'} + - {name: models, type: List, default: '[]', + description: 'Optional. List of input models required for preprocessing. These models must be created in Dkube.'} + - {name: input_model_mounts, type: List, default: '[]', + description: 'Optional. List of input models mount paths.'} + - {name: input_model_versions, type: List, default: '[]', + description: 'Optional. List of input models versions.'} + - {name: file_outputs, type: Dict, optional: true, + description: 'Optional. Dict containing output keys and paths'} + - {name: envs, type: List, default: '[]', + description: 'Optional. Environments for preprocess program. Exact key value will be made available for the container'} + - {name: tags, type: List, default: '[]', + description: 'Optional. List of user-chosen tags to allow Runs to be better identified or grouped.'} +outputs: + - {name: rundetails, description: 'Details of the dkube run'} + - {name: artifact, description: 'Identifier in Dkube storage where artifact generated by this component are stored.'} + - {name: mlpipeline-ui-metadata, description: 'metadata file to display metrics'} + - {name: output, description: 'output file from the job'} +implementation: + container: + image: ocdr/dkube_pylauncher:2.3 + command: ["python3", "-u", "/dkubepl/main.py"] + args: [ + {inputValue: job_prefix}, + {inputValue: auth_token}, + '{{workflow.uid}}', + '{{pod.name}}', + 'job', + --container, {inputValue: container}, + --script, {inputValue: run_script}, + --program, {inputValue: program}, + --commit_id, {inputValue: commit_id}, + --datasets, {inputValue: datasets}, + --input_dataset_mounts, {inputValue: input_dataset_mounts}, + --input_dataset_versions, {inputValue: input_dataset_versions}, + --featuresets, {inputValue: featuresets}, + --input_featureset_mounts, {inputValue: input_featureset_mounts}, + --input_featureset_versions, {inputValue: input_featureset_versions}, + --models, {inputValue: models}, + --input_model_mounts, {inputValue: input_model_mounts}, + --input_model_versions, {inputValue: input_model_versions}, + --envs, {inputValue: envs}, + --tags, {inputValue: tags}, + --output, {inputValue: file_outputs} + ] + fileOutputs: + rundetails: /tmp/rundetails + artifact: /tmp/artifact + mlpipeline-ui-metadata: /metadata.json + output: /output + diff --git a/kubeflow/components/preprocess/component.yaml b/kubeflow/components/preprocess/component.yaml new file mode 100644 index 0000000..3ef9c3f --- /dev/null +++ b/kubeflow/components/preprocess/component.yaml @@ -0,0 +1,101 @@ +name: dkube-preprocess +description: | + Component which can be used to perform data preprocessing on Dkube platform. + Dkube preprocess provides, + * Ability to orchestrate and run custom containers. + * Renders utilization graphs for CPU, Memory. + * Tags to group related preprocessing jobs. +metadata: + annotations: {platform: 'Dkube'} + labels: {platform: 'Dkube', logger: 'dkubepl', wfid: '{{workflow.uid}}', runid: '{{pod.name}}', stage: 'preprocess'} +inputs: + - {name: auth_token, type: String, optional: true, + description: 'Deprecated. Dkube authentication token.'} + - {name: container, type: Dict, optional: true, + description: 'Required. Container to use for preprocessing. Format: {"image":, "username":<>, "password":<>}'} + - {name: program, type: String, optional: true, default: '', + description: 'Optional. Program imported in Dkube to be run inside container. If not specified container should have entrypoint.'} + - {name: commit_id, type: String, optional: true, default: '', + description: 'Optional. Program commit ID. If not provided, dkube takes the latest commit.'} + - {name: run_script, type: String, optional: true, default: '', + description: 'Optional. Script to run the program. If not specified container should have entrypoint.'} + - {name: datasets, type: List, optional: true, default: '[]', + description: 'Optional. List of input datasets required for preprocessing. These datasets must be created in Dkube.'} + - {name: input_dataset_mounts, type: List, optional: true, default: '[]', + description: 'Optional. List of input datasets mount paths.'} + - {name: input_dataset_versions, type: List, optional: true, default: '[]', + description: 'Optional. List of input datasets versions.'} + - {name: input_featuresets, type: List, optional: true, default: '[]', + description: 'Optional. List of input featuresets required for preprocessing. These featuresets must be created in Dkube.'} + - {name: input_featureset_mounts, type: List, optional: true, default: '[]', + description: 'Optional. List of input featuresets mount paths.'} + - {name: input_featureset_versions, type: List, optional: true, default: '[]', + description: 'Optional. List of input featureset versions.'} + - {name: models, type: List, optional: true, default: '[]', + description: 'Optional. List of input models required for preprocessing. These models must be created in Dkube.'} + - {name: input_model_mounts, type: List, optional: true, default: '[]', + description: 'Optional. List of input models mount paths.'} + - {name: input_model_versions, type: List, optional: true, default: '[]', + description: 'Optional. List of input models versions.'} + - {name: outputs, type: List, optional: true, default: '[]', + description: 'Required. List of output datasets of a datajob'} + - {name: output_mounts, type: List, optional: true, default: '[]', + description: 'Required. List of output datasets mount paths'} + - {name: output_featuresets, type: List, optional: true, + description: 'Required. List of output featuresets of a datajob'} + - {name: output_featureset_mounts, type: List, optional: true, + description: 'Required. List of output featuresets mount paths'} + - {name: ngpus, type: Integer, optional: true, default: 0, + description: 'Optional. Number of gpus the training program should use.'} + - {name: config, type: String, optional: true, default: '', + description: 'Optional. HP file or configuration data required for training program. + Supported inputs - + d3s:// - Path to a file in dkube storage. + - Inline data'} + - {name: envs, type: List, optional: true, default: '[]', + description: 'Optional. Environments for preprocess program. Exact key value will be made available for the container'} + - {name: access_url, type: String, optional: true, default: '', + description: 'Optional. URL at which dkube is accessible, copy paste from the browser of this window. Required for cloud deployments.'} + - {name: job_group, type: String, optional: true, default: 'default', + description: 'Optional. Runs can be organized into Groups that allow them to be viewed together. This group must be created in Dkube.'} + - {name: tags, type: List, optional: true, default: '[]', + description: 'Optional. List of user-chosen tags to allow Runs to be better identified or grouped.'} +outputs: + - {name: rundetails, description: 'Details of the dkube run'} + - {name: artifact, description: 'Identifier in Dkube storage where artifact generated by this component are stored.'} +implementation: + container: + image: ocdr/dkubepl:3.8.5 + command: ['dkubepl'] + args: [ + preprocess, + --accessurl, {inputValue: access_url}, + --token, {inputValue: auth_token}, + --container, {inputValue: container}, + --script, {inputValue: run_script}, + --program, {inputValue: program}, + --commit_id, {inputValue: commit_id}, + --datasets, {inputValue: datasets}, + --input_dataset_mounts, {inputValue: input_dataset_mounts}, + --input_dataset_versions, {inputValue: input_dataset_versions}, + --input_featuresets, {inputValue: input_featuresets}, + --input_featureset_mounts, {inputValue: input_featureset_mounts}, + --input_featureset_versions, {inputValue: input_featureset_versions}, + --models, {inputValue: models}, + --input_model_mounts, {inputValue: input_model_mounts}, + --input_model_versions, {inputValue: input_model_versions}, + --outputs, {inputValue: outputs}, + --output_mounts, {inputValue: output_mounts}, + --output_featuresets, {inputValue: output_featuresets}, + --output_featureset_mounts, {inputValue: output_featureset_mounts}, + --ngpus, {inputValue: ngpus}, + --config, {inputValue: config}, + --envs, {inputValue: envs}, + --job_group, {inputValue: job_group}, + --tags, {inputValue: tags}, + --runid, '{{pod.name}}', + --wfid, '{{workflow.uid}}' + ] + fileOutputs: + rundetails: /tmp/rundetails + artifact: /tmp/artifact diff --git a/kubeflow/components/serving/component.yaml b/kubeflow/components/serving/component.yaml new file mode 100644 index 0000000..5e6c846 --- /dev/null +++ b/kubeflow/components/serving/component.yaml @@ -0,0 +1,87 @@ +name: dkube-serving +description: | + Component which can be used to deploy a trained model on Dkube platform. + Dkube serving provides, + * Option to deploy with CPU/GPU. + * A web server in the front and all the required infra to access the server. + * Deployed as microserice. Serving URL is provided for any other application logic to consume the model. + * Attempts to decode and present some abstract information about the model. +metadata: + annotations: {platform: 'Dkube'} + labels: {platform: 'Dkube', logger: 'dkubepl', wfid: '{{workflow.uid}}', runid: '{{pod.name}}', stage: 'serving'} +inputs: + - {name: auth_token, type: String, optional: true, + description: 'Deprecated. Dkube authentication token.'} + - {name: model, type: String, optional: true, + description: 'Required. Trained model in Dkube which is to be deployed for serving.'} + - {name: name, type: String, optional: true, + description: 'Optional. Name of deployment.'} + - {name: model_version, type: String, optional: true, + description: 'Optional. Trained model version.'} + - {name: device, type: String, optional: true, default: 'cpu', + description: 'Optional. Device to use for serving - allowed values, gpu/cpu/auto.'} + - {name: access_url, type: String, optional: true, default: '', + description: 'Optional. URL at which dkube is accessible, copy paste from the browser of this window. Required for cloud deployments.'} + - {name: serving_image, type: Dict, optional: true, + description: 'Required. Container to use for serving. Format: {"image":, "username":<>, "password":<>}'} + - {name: transformer_image, type: Dict, optional: true, + description: 'Required. Container to use as transformer. Format: {"image":, "username":<>, "password":<>}'} + - {name: transformer_project, type: String, optional: true, + description: 'Required. Transformer project.'} + - {name: transformer_code, type: String, optional: true, + description: 'Required. Transformer script.'} + - {name: transformer_commit_id, type: String, optional: true, + description: 'Optional. Transformer project commit ID.'} + - {name: min_replicas, type: String, optional: true, + description: 'Optional. Minimum number of replicas that each Revision should have. If not provided, value from platform config will be used.'} + - {name: max_concurrent_requests, type: String, optional: true, + description: 'Optional. Maximum number of requests an inf pod can process at a time. If not provided, value from platform config will be used.'} + - {name: production, type: String, optional: true, default: 'false', + description: 'Set the value to true for production deployment.' } + - {name: update, type: String, optional: true, default: 'false', + description: 'Set the value to true for updating an existing deployment. All fields must me filled in this case.'} + - {name: envs, type: List, optional: true, default: '[]', + description: 'Optional. Environments for serving program. Exact key value will be made available for the container'} + - {name: min_cpu, type: String, optional: true, + description: 'Specify minimum CPU requirement, e.g 1 (cpu) or 100m (millicpu)'} + - {name: max_cpu, type: String, optional: true, + description: 'Specify maximum CPU requirement, e.g 2 (cpu) or 200m (millicpu)'} + - {name: min_memory, type: String, optional: true, + description: 'Specify minimum memory requirement. For example 128974848, 129e6, 129M, 123Mi.'} + - {name: max_memory, type: String, optional: true, + description: 'Specify maximum memory requirement. For example 128974848, 129e6, 129M, 123Mi.'} +outputs: + - {name: rundetails, description: 'Details of the dkube run'} + - {name: servingurl, description: 'URL at which the serving web server is accessible.'} +implementation: + container: + image: ocdr/dkubepl:3.8.5 + command: ['dkubepl'] + args: [ + serving, + --accessurl, {inputValue: access_url}, + --token, {inputValue: auth_token}, + --model, {inputValue: model}, + --name, {inputValue: name}, + --model_version, {inputValue: model_version}, + --device, {inputValue: device}, + --production, {inputValue: production}, + --update, {inputValue: update}, + --serving_image, {inputValue: serving_image}, + --transformer_image, {inputValue: transformer_image}, + --transformer_project, {inputValue: transformer_project}, + --transformer_code, {inputValue: transformer_code}, + --transformer_commit_id, {inputValue: transformer_commit_id}, + --min_replicas, {inputValue: min_replicas}, + --max_concurrent_requests, {inputValue: max_concurrent_requests}, + --envs, {inputValue: envs}, + --min_cpu, {inputValue: min_cpu}, + --max_cpu, {inputValue: max_cpu}, + --min_memory, {inputValue: min_memory}, + --max_memory, {inputValue: max_memory}, + --runid, '{{pod.name}}', + --wfid, '{{workflow.uid}}' + ] + fileOutputs: + rundetails: /tmp/rundetails + servingurl: /tmp/servingurl diff --git a/kubeflow/components/slurm/preprocessing.yaml b/kubeflow/components/slurm/preprocessing.yaml new file mode 100644 index 0000000..29a8075 --- /dev/null +++ b/kubeflow/components/slurm/preprocessing.yaml @@ -0,0 +1,228 @@ +name: dkube_slurmjob_launcher +description: Launcher for slurmjob using DKube APIs. +metadata: + annotations: {platform: Dkube} + labels: + platform: Dkube + stage: preprocess + logger: dkubepl + wfid: '{{workflow.uid}}' + runid: '{{pod.name}}' +inputs: +- {name: cluster, type: String} +- {name: props, type: type} +- {name: user, type: String} +- {name: token, type: String} +- {name: run, type: type} +- {name: url, type: String, default: 'https://dkube-proxy.dkube', optional: true} +outputs: +- {name: artifacts, type: String} +- {name: run_details, type: String} +implementation: + container: + image: ocdr/dkube_launcher:slurm + command: + - python3 + - -u + - -c + - | + def launch_slurmjob(cluster, props, + user, token, run, + url = 'https://dkube-proxy.dkube'): + + import ast + import json + import os + import pprint + import time + from collections import namedtuple + from json import JSONDecodeError + + import kfp + import yaml + from dkube.sdk.internal.api_base import ApiBase + from dkube.sdk.internal.dkube_api.models.job_model import JobModel + from dkube.slurm.job_properties import JobProperties + from pyfiglet import Figlet + from url_normalize import url_normalize + + if isinstance(run, JobModel) == True: + run = run.to_dict() + elif isinstance(run, str) == True: + run = ast.literal_eval(run) + else: + assert True, "type of parameter run can be either instance of JobModel or a string of dict" + + if isinstance(props, JobProperties) == True: + props = props.to_dict() + elif isinstance(props, str) == True: + props = ast.literal_eval(props) + else: + assert True, "type of parameter props can be either instance of JobProperties or a string of dict" + + kind = run['parameters']['_class'] + assert kind in [ + "training", "preprocessing"], "Slurm job is supported only for Training/Preprocessing DKube job types" + + f = Figlet(font='slant', width=200) + print(f.renderText('Dkube {}'.format("SlurmJob"))) + + print("... Input (run) ...") + print(yaml.dump(run)) + print() + print("... Input (properties) ...") + print(yaml.dump(props)) + print("...................") + + run['parameters']['class'] = kind + + # check if datums are in right format user:datum + if 'datums' in run['parameters'][kind]: + datums = run['parameters'][kind]['datums'] + datasets = datums.get('datasets', []) + if datasets != None: + for idx, item in enumerate(datasets): + if item['name'] != None and ':' not in item['name']: + datasets[idx]['name'] = user + ':' + item['name'] + models = datums.get('models', []) + if models != None: + for idx, item in enumerate(models): + if item['name'] != None and ':' not in item['name']: + models[idx]['name'] = user + ':' + item['name'] + outputs = datums.get('outputs', []) + if outputs != None: + for idx, item in enumerate(outputs): + if item['name'] != None and ':' not in item['name']: + outputs[idx]['name'] = user + ':' + item['name'] + code = datums.get('workspace', None) + if code != None and code['data'] != None and code['data']['name'] != None: + if ':' not in code['data']['name']: + code['data']['name'] = user + ':' + code['data']['name'] + + # check if am running as pipeline component + if os.getenv('pipeline', 'false').lower() == 'true': + wfid, runid = os.getenv("wfid"), os.getenv("runid") + run['name'] = runid + run['parameters'][kind]['tags'].extend( + ['owner=pipeline', 'workflowid=' + wfid, 'runid=' + runid]) + if run['parameters']['generated'] is None: + run['parameters']['generated'] = dict() + run['parameters']['generated']['uuid'] = runid + run['parameters']['generated'].update({'pipeline': {'runid': wfid}}) + + slurm = { + "name": cluster, + "kind": "CLUSTER_SLURMHPC_REMOTE", + "scheduling_opts": { + "slurmhpc": { + "file": { + "name": "properties.json", + "body": json.dumps({ + "extra": json.dumps(props)}) + } + } + } + } + + run['parameters'][kind]["cluster"] = slurm + + name = run['name'] + api = ApiBase(url, token, []) + api._api.jobs_add_one(user=user, data=run, run='true') + + # wait loop + recorded = None + while True: + run = api.get_run(kind, user, name)['job'] + status = run['parameters']['generated']['status'] + state, reason = status['state'], status['reason'] + if state.lower() in ['complete', 'failed', 'error']: + recorded = state + print( + "run {} - completed with state {} and reason {}".format(name, state, reason)) + break + else: + if recorded != state: + print( + "run {} - waiting for completion, current state {}".format(name, state)) + recorded = state + time.sleep(10) + + if recorded.lower() in ['failed', 'error']: + exit(1) + + rundetails = json.dumps(run) + + uuid = run['parameters']['generated']['uuid'] + lineage = api.get_run_lineage(kind, user, uuid) + outputs = lineage['outputs'] + + isremote = lambda output: 'version' in output and output['version'] != None + artifacts=[ + {'datum': output['version']['datum_name'], 'class': output['version']['datum_type'], + 'version': output['version']['uuid'], 'index': output['version']['index'] + } + for output in filter(isremote, outputs) + ] + + artifacts=json.dumps(artifacts) + + output=namedtuple('Outputs', ['artifacts', 'run_details']) + return output(artifacts, rundetails) + + def _serialize_str(str_value: str) -> str: + if not isinstance(str_value, str): + raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value)))) + return str_value + + import argparse + _parser = argparse.ArgumentParser(prog='dkube_slurmjob_launcher', description='Launcher for slurmjob using DKube APIs.') + _parser.add_argument("--cluster", dest="cluster", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--props", dest="props", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--user", dest="user", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--token", dest="token", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--run", dest="run", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--url", dest="url", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = launch_slurmjob(**_parsed_args) + + _output_serializers = [ + _serialize_str, + _serialize_str, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --cluster + - {inputValue: cluster} + - --props + - {inputValue: props} + - --user + - {inputValue: user} + - --token + - {inputValue: token} + - --run + - {inputValue: run} + - if: + cond: {isPresent: url} + then: + - --url + - {inputValue: url} + - '----output-paths' + - {outputPath: artifacts} + - {outputPath: run_details} + env: + pipeline: "true" + wfid: '{{workflow.uid}}' + runid: '{{pod.name}}' diff --git a/kubeflow/components/slurm/training.yaml b/kubeflow/components/slurm/training.yaml new file mode 100644 index 0000000..d845508 --- /dev/null +++ b/kubeflow/components/slurm/training.yaml @@ -0,0 +1,228 @@ +name: dkube_slurmjob_launcher +description: Launcher for slurmjob using DKube APIs. +metadata: + annotations: {platform: Dkube} + labels: + platform: Dkube + stage: training + logger: dkubepl + wfid: '{{workflow.uid}}' + runid: '{{pod.name}}' +inputs: +- {name: cluster, type: String} +- {name: props, type: type} +- {name: user, type: String} +- {name: token, type: String} +- {name: run, type: type} +- {name: url, type: String, default: 'https://dkube-proxy.dkube', optional: true} +outputs: +- {name: artifacts, type: String} +- {name: run_details, type: String} +implementation: + container: + image: ocdr/dkube_launcher:slurm + command: + - python3 + - -u + - -c + - | + def launch_slurmjob(cluster, props, + user, token, run, + url = 'https://dkube-proxy.dkube'): + + import ast + import json + import os + import pprint + import time + from collections import namedtuple + from json import JSONDecodeError + + import kfp + import yaml + from dkube.sdk.internal.api_base import ApiBase + from dkube.sdk.internal.dkube_api.models.job_model import JobModel + from dkube.slurm.job_properties import JobProperties + from pyfiglet import Figlet + from url_normalize import url_normalize + + if isinstance(run, JobModel) == True: + run = run.to_dict() + elif isinstance(run, str) == True: + run = ast.literal_eval(run) + else: + assert True, "type of parameter run can be either instance of JobModel or a string of dict" + + if isinstance(props, JobProperties) == True: + props = props.to_dict() + elif isinstance(props, str) == True: + props = ast.literal_eval(props) + else: + assert True, "type of parameter props can be either instance of JobProperties or a string of dict" + + kind = run['parameters']['_class'] + assert kind in [ + "training", "preprocessing"], "Slurm job is supported only for Training/Preprocessing DKube job types" + + f = Figlet(font='slant', width=200) + print(f.renderText('Dkube {}'.format("SlurmJob"))) + + print("... Input (run) ...") + print(yaml.dump(run)) + print() + print("... Input (properties) ...") + print(yaml.dump(props)) + print("...................") + + run['parameters']['class'] = kind + + # check if datums are in right format user:datum + if 'datums' in run['parameters'][kind]: + datums = run['parameters'][kind]['datums'] + datasets = datums.get('datasets', []) + if datasets != None: + for idx, item in enumerate(datasets): + if item['name'] != None and ':' not in item['name']: + datasets[idx]['name'] = user + ':' + item['name'] + models = datums.get('models', []) + if models != None: + for idx, item in enumerate(models): + if item['name'] != None and ':' not in item['name']: + models[idx]['name'] = user + ':' + item['name'] + outputs = datums.get('outputs', []) + if outputs != None: + for idx, item in enumerate(outputs): + if item['name'] != None and ':' not in item['name']: + outputs[idx]['name'] = user + ':' + item['name'] + code = datums.get('workspace', None) + if code != None and code['data'] != None and code['data']['name'] != None: + if ':' not in code['data']['name']: + code['data']['name'] = user + ':' + code['data']['name'] + + # check if am running as pipeline component + if os.getenv('pipeline', 'false').lower() == 'true': + wfid, runid = os.getenv("wfid"), os.getenv("runid") + run['name'] = runid + run['parameters'][kind]['tags'].extend( + ['owner=pipeline', 'workflowid=' + wfid, 'runid=' + runid]) + if run['parameters']['generated'] is None: + run['parameters']['generated'] = dict() + run['parameters']['generated']['uuid'] = runid + run['parameters']['generated'].update({'pipeline': {'runid': wfid}}) + + slurm = { + "name": cluster, + "kind": "CLUSTER_SLURMHPC_REMOTE", + "scheduling_opts": { + "slurmhpc": { + "file": { + "name": "properties.json", + "body": json.dumps({ + "extra": json.dumps(props)}) + } + } + } + } + + run['parameters'][kind]["cluster"] = slurm + + name = run['name'] + api = ApiBase(url, token, []) + api._api.jobs_add_one(user=user, data=run, run='true') + + # wait loop + recorded = None + while True: + run = api.get_run(kind, user, name)['job'] + status = run['parameters']['generated']['status'] + state, reason = status['state'], status['reason'] + if state.lower() in ['complete', 'failed', 'error']: + recorded = state + print( + "run {} - completed with state {} and reason {}".format(name, state, reason)) + break + else: + if recorded != state: + print( + "run {} - waiting for completion, current state {}".format(name, state)) + recorded = state + time.sleep(10) + + if recorded.lower() in ['failed', 'error']: + exit(1) + + rundetails = json.dumps(run) + + uuid = run['parameters']['generated']['uuid'] + lineage = api.get_run_lineage(kind, user, uuid) + outputs = lineage['outputs'] + + isremote = lambda output: 'version' in output and output['version'] != None + artifacts=[ + {'datum': output['version']['datum_name'], 'class': output['version']['datum_type'], + 'version': output['version']['uuid'], 'index': output['version']['index'] + } + for output in filter(isremote, outputs) + ] + + artifacts=json.dumps(artifacts) + + output=namedtuple('Outputs', ['artifacts', 'run_details']) + return output(artifacts, rundetails) + + def _serialize_str(str_value: str) -> str: + if not isinstance(str_value, str): + raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value)))) + return str_value + + import argparse + _parser = argparse.ArgumentParser(prog='dkube_slurmjob_launcher', description='Launcher for slurmjob using DKube APIs.') + _parser.add_argument("--cluster", dest="cluster", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--props", dest="props", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--user", dest="user", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--token", dest="token", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--run", dest="run", type=str, required=True, default=argparse.SUPPRESS) + _parser.add_argument("--url", dest="url", type=str, required=False, default=argparse.SUPPRESS) + _parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=2) + _parsed_args = vars(_parser.parse_args()) + _output_files = _parsed_args.pop("_output_paths", []) + + _outputs = launch_slurmjob(**_parsed_args) + + _output_serializers = [ + _serialize_str, + _serialize_str, + + ] + + import os + for idx, output_file in enumerate(_output_files): + try: + os.makedirs(os.path.dirname(output_file)) + except OSError: + pass + with open(output_file, 'w') as f: + f.write(_output_serializers[idx](_outputs[idx])) + args: + - --cluster + - {inputValue: cluster} + - --props + - {inputValue: props} + - --user + - {inputValue: user} + - --token + - {inputValue: token} + - --run + - {inputValue: run} + - if: + cond: {isPresent: url} + then: + - --url + - {inputValue: url} + - '----output-paths' + - {outputPath: artifacts} + - {outputPath: run_details} + env: + pipeline: "true" + wfid: '{{workflow.uid}}' + runid: '{{pod.name}}' diff --git a/kubeflow/components/storage/component.yaml b/kubeflow/components/storage/component.yaml new file mode 100644 index 0000000..743fe23 --- /dev/null +++ b/kubeflow/components/storage/component.yaml @@ -0,0 +1,41 @@ +name: dkube-storage +description: | + Component which can be used to export artifacts in DKube as kubernetes volumes +metadata: + annotations: {platform: 'Dkube'} + labels: {platform: 'Dkube', logger: 'dkubepl', wfid: '{{workflow.uid}}', runid: '{{pod.name}}', stage: 'storage'} +inputs: + - {name: command, type: String, optional: true, + description: 'Required. Command to storage op to export or reclaim.'} + - {name: auth_token, type: String, optional: true, + description: 'Deprecated. Dkube authentication token.'} + - {name: namespace, type: String, default: 'kubeflow', + description: 'Required. Namespace to export volumes to.'} + - {name: input_volumes, type: List, optional: true, default: '[]', + description: 'Optional. List of input artifacts to be exported.'} + - {name: output_volumes, type: List, optional: true, default: '[]', + description: 'Optional. List of input artifacts to be exported.'} + - {name: intermediate_volume, type: String, optional: true, default: '', + description: 'Optional. Intermediate artifact to be exported.'} + - {name: uid, type: String, default: '{{workflow.uid}}', + description: 'Optional. UID of artifact to be reclaimed.'} +outputs: + - {name: volumes, description: 'Volumes exported by this component'} +implementation: + container: + image: ocdr/dkubepl:3.8.5 + command: ['dkubepl'] + args: [ + storage, + --action, {inputValue: command}, + --token, {inputValue: auth_token}, + --namespace, {inputValue: namespace}, + --input_volumes, {inputValue: input_volumes}, + --output_volumes, {inputValue: output_volumes}, + --intermediate_volume, {inputValue: intermediate_volume}, + --uid, {inputValue: uid}, + --runid, '{{pod.name}}', + --wfid, '{{workflow.uid}}', + ] + fileOutputs: + volumes: /tmp/volumes diff --git a/kubeflow/components/submit/component.yaml b/kubeflow/components/submit/component.yaml new file mode 100644 index 0000000..d739085 --- /dev/null +++ b/kubeflow/components/submit/component.yaml @@ -0,0 +1,31 @@ +name: dkube-eval-submit +description: | + Submit predictions to leaderboard. +metadata: + annotations: {platform: 'Dkube'} + labels: {platform: 'Dkube', logger: 'dkubepl', wfid: '{{workflow.uid}}', runid: '{{pod.name}}', stage: 'submit'} +inputs: + - {name: auth_token, type: String, optional: false, + description: 'Required. Dkube authentication token.'} + - {name: project_id, type: String, optional: false, + description: 'Required. ID of the project to which submission is made.'} + - {name: predictions, type: CSV, optional: false, + description: 'Required. CSV prediction file from previous component'} +outputs: + - {name: mlpipeline-ui-metadata, description: 'metadata file to display static html'} + - {name: submit-results, description: 'folder contain output from eval script'} +implementation: + container: + image: ocdr/d3project_eval:v5 + command: ['python'] + args: [ + submit.py, + '{{workflow.uid}}', + "-p", {inputValue: project_id}, + "-t", {inputValue: auth_token}, + {inputPath: predictions} + ] + + fileOutputs: + mlpipeline-ui-metadata: /metadata.json + submit-results: /results diff --git a/kubeflow/components/training/component.yaml b/kubeflow/components/training/component.yaml new file mode 100644 index 0000000..ca11d71 --- /dev/null +++ b/kubeflow/components/training/component.yaml @@ -0,0 +1,132 @@ +name: dkube-training +description: | + Component which can be used to do training for deep learning models on Dkube platform. + Dkube training offers, + * Advanced options for distributed training, gpu managment & pooling. + * Support Hyper parameter tuning. + * GDRDMA support for Horovod like training programs. + * Ability to orchestrate and run custom training containers, prebuilt dkube datascience containers can also be used. + * Renders nice Dashboard for training metrics and utilization graphs for GPU, CPU, Memory. + * Support for early stopping if program is not converging - User can abort the Job and resume from previous point in training. + * Tags to group related training jobs. +metadata: + annotations: {platform: 'Dkube'} + labels: {platform: 'Dkube', logger: 'dkubepl', wfid: '{{workflow.uid}}', runid: '{{pod.name}}', stage: 'training'} +inputs: + - {name: auth_token, type: String, optional: true, + description: 'Deprecated. Dkube authentication token.'} + - {name: container, type: Dict, optional: true, + description: 'Required. Container to use for training. Format: {"image":, "username":<>, "password":<>}'} + - {name: program, type: String, optional: true, default: '', + description: 'Optional. Program imported in Dkube to be run inside container. If not specified container should have entrypoint.'} + - {name: commit_id, type: String, optional: true, default: '', + description: 'Optional. Program commit ID. If not provided, dkube takes the latest commit.'} + - {name: run_script, type: String, optional: true, default: '', + description: 'Optional. Script to run the program. If not specified container should have entrypoint.'} + - {name: datasets, type: List, optional: true, default: '[]', + description: 'Optional. List of input datasets required for training. These datasets must be created in Dkube.'} + - {name: input_dataset_mounts, type: List, optional: true, default: '[]', + description: 'Optional. List of input datasets mount paths.'} + - {name: input_dataset_versions, type: List, optional: true, default: '[]', + description: 'Optional. List of input datasets versions.'} + - {name: featuresets, type: List, optional: true, default: '[]', + description: 'Optional. List of input featuresets required for training. These featuresets must be created in Dkube.'} + - {name: input_featureset_mounts, type: List, optional: true, default: '[]', + description: 'Optional. List of input featureset mount paths.'} + - {name: input_featureset_versions, type: List, optional: true, default: '[]', + description: 'Optional. List of input featureset versions.'} + - {name: models, type: List, optional: true, default: '[]', + description: 'Optional. List of input models required for training. These models must be created in Dkube.'} + - {name: input_model_mounts, type: List, optional: true, default: '[]', + description: 'Optional. List of input models mount paths.'} + - {name: input_model_versions, type: List, optional: true, default: '[]', + description: 'Optional. List of input models mount versions.'} + - {name: outputs, type: List, optional: true, default: '[]', + description: 'Required. List of output models of a training'} + - {name: output_mounts, type: List, optional: true, default: '[]', + description: 'Required. List of output model mount paths'} + - {name: ngpus, type: Integer, optional: true, default: 0, + description: 'Optional. Number of gpus the training program should use.'} + - {name: nworkers, type: Integer, optional: true, default: 0, + description: 'Optional. Number of workers for training, >0 for distributed training.'} + - {name: auto_distribute, type: String, optional: true, default: 'false', + description: 'Optional. Should Dkube auto distribute based on available number of resources.'} + - {name: config, type: String, optional: true, default: '', + description: 'Optional. HP file or configuration data required for training program. + Supported inputs - + d3s:// - Path to a file in dkube storage. + - Inline data'} + - {name: tuning, type: String, optional: true, default: '', + description: 'Optional. HP tuning information. Can be a URL to a file with hptuning definition or inline data. + Supported inputs - + d3s:// - Path to a file in dkube storage. + - Inline data, only json formatted string is valid.'} + - {name: envs, type: List, optional: true, default: '[]', + description: 'Optional. Environments for training program. Exact key value will be made available for the container'} + - {name: gdrdma, type: String, optional: true, default: 'false', + description: 'Optional. Whether to use GDRDMA for distributed training.'} + - {name: access_url, type: String, optional: true, default: '', + description: 'Optional. URL at which dkube is accessible, copy paste from the browser of this window. Required for cloud deployments.'} + - {name: job_group, type: String, optional: true, default: 'default', + description: 'Optional. Runs can be organized into Groups that allow them to be viewed together. This group must be created in Dkube.'} + - {name: framework, type: String, optional: true, + description: 'Required. Framework {tensorflow, pytorch, sklearn, custom...}.'} + - {name: version, type: String, optional: true, + description: 'Required. Framework version.'} + - {name: tags, type: List, optional: true, default: '[]', + description: 'Optional. List of user-chosen tags to allow Runs to be better identified or grouped.'} + - {name: min_cpu, type: String, optional: true, + description: 'Specify minimum CPU requirement, e.g 1 (cpu) or 100m (millicpu)'} + - {name: max_cpu, type: String, optional: true, + description: 'Specify maximum CPU requirement, e.g 2 (cpu) or 200m (millicpu)'} + - {name: min_memory, type: String, optional: true, + description: 'Specify minimum memory requirement. For example 128974848, 129e6, 129M, 123Mi.'} + - {name: max_memory, type: String, optional: true, + description: 'Specify maximum memory requirement. For example 128974848, 129e6, 129M, 123Mi.'} +outputs: + - {name: rundetails, description: 'Details of the dkube run'} + - {name: artifact, description: 'Identifier in Dkube storage where artifacts of training are stored.'} +implementation: + container: + image: ocdr/dkubepl:3.8.5 + command: ['dkubepl'] + args: [ + training, + --accessurl, {inputValue: access_url}, + --token, {inputValue: auth_token}, + --container, {inputValue: container}, + --script, {inputValue: run_script}, + --program, {inputValue: program}, + --commit_id, {inputValue: commit_id}, + --datasets, {inputValue: datasets}, + --input_dataset_mounts, {inputValue: input_dataset_mounts}, + --input_dataset_versions, {inputValue: input_dataset_versions}, + --input_featuresets, {inputValue: featuresets}, + --input_featureset_mounts, {inputValue: input_featureset_mounts}, + --input_featureset_versions, {inputValue: input_featureset_versions}, + --models, {inputValue: models}, + --input_model_mounts, {inputValue: input_model_mounts}, + --input_model_versions, {inputValue: input_model_versions}, + --outputs, {inputValue: outputs}, + --output_mounts, {inputValue: output_mounts}, + --ngpus, {inputValue: ngpus}, + --nworkers, {inputValue: nworkers}, + --auto, {inputValue: auto_distribute}, + --config, {inputValue: config}, + --tuning, {inputValue: tuning}, + --envs, {inputValue: envs}, + --gdrdma, {inputValue: gdrdma}, + --job_group, {inputValue: job_group}, + --framework, {inputValue: framework}, + --version, {inputValue: version}, + --tags, {inputValue: tags}, + --min_cpu, {inputValue: min_cpu}, + --max_cpu, {inputValue: max_cpu}, + --min_memory, {inputValue: min_memory}, + --max_memory, {inputValue: max_memory}, + --runid, '{{pod.name}}', + --wfid, '{{workflow.uid}}' + ] + fileOutputs: + rundetails: /tmp/rundetails + artifact: /tmp/artifact diff --git a/kubeflow/components/viewer/component.yaml b/kubeflow/components/viewer/component.yaml new file mode 100644 index 0000000..3d01b4d --- /dev/null +++ b/kubeflow/components/viewer/component.yaml @@ -0,0 +1,39 @@ +name: dkube-viewer +description: | + Viewer component which renders UI as static HTML in KF Output viewer. + Support view types, + * A web application to test inference for Dkube examples Only. + * In future will add TFMA, Tensorboard etc.. +metadata: + annotations: {platform: 'Dkube'} + labels: {platform: 'Dkube', logger: 'dkubepl', wfid: '{{workflow.uid}}', runid: '{{pod.name}}', stage: 'viewer'} +inputs: + - {name: auth_token, type: String, optional: true, + description: 'Deprecated. Dkube authentication token.'} + - {name: servingurl, type: String, optional: true, + description: 'Required. URL where the model is deployed for serving in Dkube.'} + - {name: servingexample, type: String, optional: true, + description: 'Required. Name of the Dkube example which is deployed for serving. + Possible options - digits/catsdogs/bolts'} + - {name: viewtype, type: String, optional: true, default: 'inference', + description: 'Optional. Currently on viewtype=inference is only supported.'} + - {name: access_url, type: String, optional: true, default: '', + description: 'Optional. URL at which dkube is accessible, copy paste from the browser of this window. Required for cloud deployments.'} +outputs: + - {name: rundetails, description: 'Details of the run'} +implementation: + container: + image: ocdr/dkubepl:3.8.5 + command: ['dkubepl'] + args: [ + viewer, + --accessurl, {inputValue: access_url}, + --viewtype, {inputValue: viewtype}, + --token, {inputValue: auth_token}, + --servingurl, {inputValue: servingurl}, + --servingexample, {inputValue: servingexample}, + --runid, '{{pod.name}}', + --wfid, '{{workflow.uid}}' + ] + fileOutputs: + rundetails: /tmp/rundetails