Source code for dkube.sdk.api

"""

.. module:: DKubeAPI
   :synopsis: Helper class which provides high level methods for user to integrate at workflow level.

.. moduleauthor:: Ahmed Khan <github.com/mak-454>


"""

import json
import os
import time

import pandas as pd
import urllib3
from dkube.sdk.internal.api_base import *
from dkube.sdk.internal.dkube_api.models.conditions import \
    Conditions as TriggerCondition
from dkube.sdk.internal.dkube_api.rest import ApiException
from dkube.sdk.internal.files_base import *
from dkube.sdk.rsrcs import *
from dkube.sdk.rsrcs.featureset import DkubeFeatureSet, DKubeFeatureSetUtils
from dkube.sdk.rsrcs.modelmonitor import (DatasetClass, DkubeModelmonitoralert,
                                          DkubeModelmonitordataset)
from dkube.sdk.rsrcs.project import DkubeProject
from packaging import version as pversion

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


[docs]class DkubeApi(ApiBase, FilesBase): """ This class encapsules all the high level dkube workflow functions.:: from dkube.sdk import * dapi = DkubeApi() *Inputs* URL FQDN endpoint at which DKube platform is deployed:: http://dkube-controller-master.dkube.cluster.local:5000 https://dkube.ai:32222 .. note:: If not provided then the value is picked from *DKUBE_ACCESS_URL* env variable. If not found then http://dkube-controller-master.dkube.cluster.local:5000 is used assuming the access is internal to the DKube cluster token Access token for the APIs, without which DKube will return 40x codes .. note:: If not provided then the value is picked from *DKUBE_ACCESS_TOKEN* env variable. ASSERTs if env is not defined. common_tags Tags which need to applied all the resources created using this API object req_timeout Timeout for all the requests which are issued using this API object req_retries Number of retries per request """ def __init__( self, URL=None, token=None, common_tags=[], req_timeout=None, req_retries=None ): self.url = URL if self.url is None: self.url = os.getenv( "DKUBE_ACCESS_URL", "http://dkube-controller-master.dkube.svc.cluster.local:5000", ) self.files_url = os.getenv( "DKUBE_ACCESS_URL", "http://dkube-downloader.dkube.svc.cluster.local:9401", ) else: self.files_url = self.url self.token = token if self.token == None: self.token = os.getenv("DKUBE_ACCESS_TOKEN", None) assert ( self.token == None ), "TOKEN must be specified either by passing argument or by setting DKUBE_ACCESS_TOKEN env variable" ApiBase.__init__(self, self.url, self.token, common_tags) FilesBase.__init__(self, self.files_url, self.token) self.dkubeinfo = super().dkubeinfo()
[docs] def set_active_project(self, project_id): """ Set active project. Any resources created using this API instance will belong to the given project. *Available in DKube Release: 2.2* *Inputs* project_id ID of the project. pass None to unset. """ self.common_tags = [ tag for tag in self.common_tags if not tag.startswith("project:") ] if project_id: self.common_tags.append("project:" + str(project_id))
[docs] def validate_token(self): """ Method which can be used to validate the token. Returns the JWT Claims. Which contains the role assigned to the user. """ return super().validate_token()
[docs] def launch_jupyter_ide(self, ide: DkubeIDE, wait_for_completion=True): """ Method to launch a Jupyter IDE on DKube platform. Two kinds of IDE are supported, Jupyter Notebook & RStudio. Raises Exception in case of errors. *Inputs* ide Instance of :bash:`dkube.sdk.rsrcs.DkubeIDE` class. Please see the :bash:`Resources` section for details on this class. wait_for_completion When set to :bash:`True` this method will wait for job to complete after submission. IDE is declared complete if it is one of the :bash:`running/failed/error` state """ assert ( type(ide) == DkubeIDE ), "Invalid type for run, value must be instance of rsrcs:DkubeIDE class" super().launch_jupyter_ide(ide) while wait_for_completion: status = super().get_ide("notebook", ide.user, ide.name, fields="status") state, reason = status["state"], status["reason"] if state.lower() in ["running", "failed", "error"]: print( "IDE {} - completed with state {} and reason {}".format( ide.name, state, reason ) ) break else: print( "IDE {} - waiting for completion, current state {}".format( ide.name, state ) ) time.sleep(self.wait_interval)
[docs] def launch_rstudio_ide(self, ide: DkubeIDE, wait_for_completion=True): """ Method to launch a Rstudio IDE on DKube platform. Two kinds of IDE are supported, Jupyter Notebook & RStudio. Raises Exception in case of errors. *Inputs* ide Instance of :bash:`dkube.sdk.rsrcs.DkubeIDE` class. Please see the :bash:`Resources` section for details on this class. wait_for_completion When set to :bash:`True` this method will wait for job to complete after submission. IDE is declared complete if it is one of the :bash:`running/failed/error` state """ assert ( type(ide) == DkubeIDE ), "Invalid type for run, value must be instance of rsrcs:DkubeIDE class" super().launch_rstudio_ide(ide) while wait_for_completion: status = super().get_ide("notebook", ide.user, ide.name, fields="status") state, reason = status["state"], status["reason"] if state.lower() in ["running", "failed", "error"]: print( "IDE {} - completed with state {} and reason {}".format( ide.name, state, reason ) ) break else: print( "IDE {} - waiting for completion, current state {}".format( ide.name, state ) ) time.sleep(self.wait_interval)
[docs] def list_ides(self, user, shared=False, filters="*"): """ Method to list all the IDEs of a user. Raises exception on any connection errors. *Inputs* user User whose IDE instances must be fetched. In case of if token is of different user, then the token should have permission to fetch the training runs of the :bash:`user` in the input. They should be in same DKube group. filters Only :bash:`*` is supported now. User will able to filter runs based on state or duration """ return super().list_ides("notebook", user, shared)
[docs] def delete_ide(self, user, name, wait_for_completion=True): """ Method to delete an IDE. Raises exception if token is of different user or if IDE with name doesnt exist or on any connection errors. *Inputs* user The token must belong to this user. As IDE instance of different user cannot be deleted. name Name of the IDE which needs to be deleted. wait_for_completion When set to :bash:`True` this method will wait for ide to get deleted. """ data = super().get_ide("notebook", user, name, fields="*") uuid = data["job"]["parameters"]["generated"]["uuid"] ret = super().delete_ide("notebook", user, name) if wait_for_completion: self._wait_for_rundelete_completion(uuid, "notebook", name) return ret
[docs] def create_training_run(self, run: DkubeTraining, wait_for_completion=True): """ Method to create a training run on DKube. Raises Exception in case of errors. *Inputs* run Instance of :bash:`dkube.sdk.rsrcs.Training` class. Please see the :bash:`Resources` section for details on this class. wait_for_completion When set to :bash:`True` this method will wait for job to complete after submission. Job is declared complete if it is one of the :bash:`complete/failed/error` state """ assert ( type(run) == DkubeTraining ), "Invalid type for run, value must be instance of rsrcs:DkubeTraining class" valid_fw = False fw_opts = ["custom"] if run.executor_dkube_framework.choice == "custom": valid_fw = True else: fws = self.get_training_capabilities() for fw in fws: for v in fw["versions"]: if ( run.executor_dkube_framework.choice == fw["name"] and run.dkube_framework_details.version == v["name"] ): valid_fw = True break else: name = fw["name"] + "_" + v["name"] fw_opts.append(name) if valid_fw == True: break assert valid_fw == True, ( "Invalid choice for framework, select oneof(" + str(fw_opts) + ")" ) super().update_tags(run.training_def) super().create_run(run) while wait_for_completion: status = {} try: status = super().get_run( "training", run.user, run.name, fields="status" ) except ValueError as ve: ve_without_num = "".join(i for i in str(ve) if not i.isdigit()) if "Invalid value for `state` (Waiting for gpu(s))" in ve_without_num: num = "".join(i for i in str(ve) if i.isdigit()) status["state"] = "Waiting for {} gpu(s)".format(num) status["reason"] = "" else: raise ve state, reason = status["state"], status["reason"] if state.lower() in ["complete", "failed", "error", "stopped", "created"]: print( "run {} - completed with state {} and reason {}".format( run.name, state, reason ) ) break else: print( "run {} - waiting for completion, current state {}".format( run.name, state ) ) time.sleep(self.wait_interval)
[docs] def get_training_run(self, user, name): """ Method to fetch the training run with given name for the given user. Raises exception in case of run is not found or any other connection errors. *Inputs* user User whose training run has to be fetched. In case of if token is of different user, then the token should have permission to fetch the training run of the :bash:`user` in the input. They should be in same DKube group. name Name of the training run to be fetched """ return super().get_run("training", user, name)
[docs] def list_training_runs(self, user, shared=False, filters="*"): """ Method to list all the training runs of a user. Raises exception on any connection errors. *Inputs* user User whose training runs must be fetched. In case of if token is of different user, then the token should have permission to fetch the training runs of the :bash:`user` in the input. They should be in same DKube group. filters Only :bash:`*` is supported now. User will able to filter runs based on state or duration """ return super().list_runs("training", user, shared)
[docs] def delete_training_run(self, user, name, wait_for_completion=True): """ Method to delete a run. Raises exception if token is of different user or if training run with name doesnt exist or on any connection errors. *Inputs* user The token must belong to this user. As run of different user cannot be deleted. name Name of the run which needs to be deleted. wait_for_completion When set to :bash:`True` this method will wait for training run to get deleted. """ data = super().get_run("training", user, name, fields="*") uuid = data["job"]["parameters"]["generated"]["uuid"] ret = super().delete_run("training", user, name) if wait_for_completion: self._wait_for_rundelete_completion(uuid, "training", name) return ret
[docs] def create_preprocessing_run( self, run: DkubePreprocessing, wait_for_completion=True ): """ Method to create a preprocessing run on DKube. Raises Exception in case of errors. *Inputs* run Instance of :bash:`dkube.sdk.rsrcs.Preprocessing` class. Please see the :bash:`Resources` section for details on this class. wait_for_completion When set to :bash:`True` this method will wait for job to complete after submission. Job is declared complete if it is one of the :bash:`complete/failed/error` state """ assert ( type(run) == DkubePreprocessing ), "Invalid type for run, value must be instance of rsrcs:DkubePreprocessing class" super().update_tags(run.pp_def) super().create_run(run) while wait_for_completion: status = {} try: status = super().get_run( "preprocessing", run.user, run.name, fields="status" ) except ValueError as ve: ve_without_num = "".join(i for i in str(ve) if not i.isdigit()) if "Invalid value for `state` (Waiting for gpu(s))" in ve_without_num: num = "".join(i for i in str(ve) if i.isdigit()) status["state"] = "Waiting for {} gpu(s)".format(num) status["reason"] = "" else: raise ve state, reason = status["state"], status["reason"] if state.lower() in ["complete", "failed", "error", "stopped"]: print( "run {} - completed with state {} and reason {}".format( run.name, state, reason ) ) break else: print( "run {} - waiting for completion, current state {}".format( run.name, state ) ) time.sleep(self.wait_interval)
[docs] def get_preprocessing_run(self, user, name): """ Method to fetch the preprocessing run with given name for the given user. Raises exception in case of run is not found or any other connection errors. *Inputs* user User whose preprocessing run has to be fetched. In case of if token is of different user, then the token should have permission to fetch the preprocessing run of the :bash:`user` in the input. They should be in same DKube group. name Name of the training run to be fetched """ return super().get_run("preprocessing", user, name)
[docs] def list_preprocessing_runs(self, user, shared=False, filters="*"): """ Method to list all the preprocessing runs of a user. Raises exception on any connection errors. *Inputs* user User whose preprocessing runs must be fetched. In case of if token is of different user, then the token should have permission to fetch the preprocessing runs of the :bash:`user` in the input. They should be in same DKube group. filters Only :bash:`*` is supported now. User will able to filter runs based on state or duration """ return super().list_runs("preprocessing", user, shared)
[docs] def delete_preprocessing_run(self, user, name, wait_for_completion=True): """ Method to delete a run. Raises exception if token is of different user or if preprocessing run with name doesnt exist or on any connection errors. *Inputs* user The token must belong to this user. As run of different user cannot be deleted. name Name of the run which needs to be deleted. wait_for_completion When set to :bash:`True` this method will wait for preprocess run to get deleted. """ data = super().get_run("preprocessing", user, name, fields="*") uuid = data["job"]["parameters"]["generated"]["uuid"] ret = super().delete_run("preprocessing", user, name) if wait_for_completion: self._wait_for_rundelete_completion(uuid, "preprocessing", name) return ret
[docs] def update_inference(self, run: DkubeServing, wait_for_completion=True): """ Method to update a test inference/deployment in DKube. Raises Exception in case of errors. *Inputs* run Instance of :bash:`dkube.sdk.rsrcs.serving` class. Please see the :bash:`Resources` section for details on this class. Picks defaults for predictor, transformer configs from the existing inference deployment. If version is not specified then deployment is updated to latest version. wait_for_completion When set to :bash:`True` this method will wait for job to complete after submission. Job is declared complete if it is one of the :bash:`complete/failed/error` state """ inference = super().get_run("inference", run.user, run.name) inference = inference["job"]["parameters"]["inference"] if run.predictor.image == None: run.update_serving_image( None, inference["serving_image"]["image"]["path"], inference["serving_image"]["image"]["username"], inference["serving_image"]["image"]["password"], ) if run.serving_def.model == None: run.serving_def.model = inference["model"] if run.serving_def.version == None: run.serving_def.version = inference["version"] if inference["transformer"] == True and run.serving_def.transformer == False: run.update_transformer_image( inference["transformer_image"]["image"]["path"], inference["transformer_image"]["image"]["username"], inference["transformer_image"]["image"]["password"], ) run.set_transformer(True, script=inference["transformer_code"]) run.update_transformer_code( inference["transformer_project"], inference["transformer_commit_id"] ) elif inference["transformer"] == False and run.serving_def.transformer == True: li = self.get_model_lineage( run.serving_def.owner, run.serving_def.model, run.serving_def.version ) if run.transformer.image == None: ti = li["run"]["parameters"]["generated"]["training_image"]["image"] run.update_transformer_image(ti["path"], ti["username"], ti["password"]) if run.serving_def.transformer_project == None: code = li["run"]["parameters"]["training"]["datums"]["workspace"][ "data" ] name = code["name"].split(":")[1] run.update_transformer_code(name, code["version"]) if run.serving_def.min_replicas == 0: run.serving_def.min_replicas = inference["minreplicas"] if run.serving_def.max_concurrent_requests == 0: run.serving_def.max_concurrent_requests = inference["maxconcurrentrequests"] if super().is_model_catalog_enabled() == True: run.serving_def.deploy = inference["deploy"] else: run.serving_def.deploy = None super().update_inference(run) while wait_for_completion: status = super().get_run("inference", run.user, run.name, fields="status") state, reason = status["state"], status["reason"] if state.lower() in ["complete", "failed", "error", "running", "stopped"]: print( "run {} - completed with state {} and reason {}".format( run.name, state, reason ) ) break else: print( "run {} - waiting for completion, current state {}".format( run.name, state ) ) time.sleep(self.wait_interval)
[docs] def create_test_inference(self, run: DkubeServing, wait_for_completion=True): """ Method to create a test inference on DKube. Raises Exception in case of errors. *Inputs* run Instance of :bash:`dkube.sdk.rsrcs.serving` class. Please see the :bash:`Resources` section for details on this class. If serving image is not updated in :bash:`run:DkubeServing` argument then, - If training used supported standard framework, dkube will pick approp serving image - If training used custom image, dkube will try to use the same image for serving If transformer image is not updated in :bash:`run:DkubeServing` then, - Dkube will use same image as training image If transformer code is not updated in :bash:`run:DkubeServing` then, - Dkube will use the code used for training wait_for_completion When set to :bash:`True` this method will wait for job to complete after submission. Job is declared complete if it is one of the :bash:`complete/failed/error` state """ assert ( type(run) == DkubeServing ), "Invalid type for run, value must be instance of rsrcs:DkubeServing class" # Fetch training run details and fill in information for serving if ( run.predictor.image == None or (run.serving_def.transformer == True and run.transformer.image == None) or ( run.serving_def.transformer == True and run.serving_def.transformer_project == None ) ): if run.serving_def.version == None: v = self.get_model_latest_version( run.serving_def.owner, run.serving_def.model ) run.serving_def.version = v["uuid"] li = self.get_model_lineage( run.serving_def.owner, run.serving_def.model, run.serving_def.version ) if li == None or li["run"] == None: li = None if li == None and run.predictor.image == None: raise Exception("Lineage is nil, predictor image must be provided.") if li != None and run.predictor.image == None: si = li["run"]["parameters"]["generated"]["serving_image"]["image"] run.update_serving_image( None, si["path"], si["username"], si["password"] ) if ( li != None and run.serving_def.transformer == True and run.transformer.image == None ): ti = li["run"]["parameters"]["generated"]["training_image"]["image"] run.update_transformer_image(ti["path"], ti["username"], ti["password"]) if ( li != None and run.serving_def.transformer == True and run.serving_def.transformer_project == None ): code = li["run"]["parameters"]["training"]["datums"]["workspace"][ "data" ] name = code["name"].split(":")[1] run.update_transformer_code(name, code["version"]) # Don't allow prod deploy using this API, if MODEL_CATALOG_ENABLED=true if ( run.serving_def.deploy == True and super().is_model_catalog_enabled() == True ): run.serving_def.deploy = None super().create_run(run) while wait_for_completion: status = super().get_run("inference", run.user, run.name, fields="status") state, reason = status["state"], status["reason"] if state.lower() in ["complete", "failed", "error", "running", "stopped"]: print( "run {} - completed with state {} and reason {}".format( run.name, state, reason ) ) break else: print( "run {} - waiting for completion, current state {}".format( run.name, state ) ) time.sleep(self.wait_interval)
[docs] def get_test_inference(self, user, name): """ Method to fetch the test inference with given name for the given user. Raises exception in case of run is not found or any other connection errors. *Inputs* user User whose test inference has to be fetched. In case of if token is of different user, then the token should have permission to fetch the serving run of the :bash:`user` in the input. They should be in same DKube group. name Name of the serving run to be fetched """ return super().get_run("inference", user, name)
[docs] def list_test_inferences(self, user, shared=False, filters="*"): """ Method to list all the training inferences of a user. Raises exception on any connection errors. *Inputs* user User whose test inferences must be fetched. In case of if token is of different user, then the token should have permission to fetch the serving runs of the :bash:`user` in the input. They should be in same DKube group. filters Only :bash:`*` is supported now. User will able to filter runs based on state or duration """ return super().list_runs("inference", user, shared)
[docs] def delete_test_inference(self, user, name, wait_for_completion=True): """ Method to delete a test inference. Raises exception if token is of different user or if serving run with name doesnt exist or on any connection errors. *Inputs* user The token must belong to this user. As run of different user cannot be deleted. name Name of the run which needs to be deleted. wait_for_completion When set to :bash:`True` this method will wait for inference to get deleted. """ data = super().get_run("inference", user, name, fields="*") uuid = data["job"]["parameters"]["generated"]["uuid"] ret = super().delete_run("inference", user, name) if wait_for_completion: self._wait_for_rundelete_completion(uuid, "inference", name) return ret
[docs] def create_code(self, code: DkubeCode, wait_for_completion=True): """ Method to create a code repo on DKube. Raises Exception in case of errors. *Inputs* code Instance of :bash:`dkube.sdk.rsrcs.code` class. Please see the :bash:`Resources` section for details on this class. wait_for_completion When set to :bash:`True` this method will wait for code resource to get into one of the complete state. code is declared complete if it is one of the :bash:`complete/failed/error` state """ assert ( type(code) == DkubeCode ), "Invalid type for run, value must be instance of rsrcs:DkubeCode class" super().create_repo(code) while wait_for_completion: status = super().get_repo("program", code.user, code.name, fields="status") state, reason = status["state"], status["reason"] if state.lower() in ["ready", "failed", "error"]: print( "code {} - completed with state {} and reason {}".format( code.name, state, reason ) ) break else: print( "code {} - waiting for completion, current state {}".format( code.name, state ) ) time.sleep(self.wait_interval)
[docs] def get_code(self, user, name): """ Method to fetch the code repo with given name for the given user. Raises exception in case of code is not found or any other connection errors. *Inputs* user User whose code has to be fetched. In case of if token is of different user, then the token should have permission to fetch the code of the :bash:`user` in the input. They should be in same DKube group. name Name of the code repo to be fetched """ return super().get_repo("program", user, name)
[docs] def list_code(self, user, shared=False, filters="*"): """ Method to list all the code repos of a user. Raises exception on any connection errors. *Inputs* user User whose projects must be fetched. In case of if token is of different user, then the token should have permission to fetch the projects of the :bash:`user` in the input. They should be in same DKube group. filters Only :bash:`*` is supported now. User will able to filter projects based on state or the source """ return super().list_repos("program", user, shared)
[docs] def delete_code(self, user, name, force=False): """ Method to delete a code repo. Raises exception if token is of different user or if code with name doesnt exist or on any connection errors. *Inputs* user The token must belong to this user. As code of different user cannot be deleted. name Name of the code which needs to be deleted. """ super().delete_repo("program", user, name, force=force)
################### Feature Store ############################
[docs] def create_featureset(self, featureset: DkubeFeatureSet, wait_for_completion=True): """ Method to create a featureset on DKube. *Available in DKube Release: 2.2* *Inputs* featureset Instance of :bash:`dkube.sdk.rsrcs.featureSet` class. Please see the :bash:`Resources` section for details on this class. wait_for_completion When set to :bash:`True` this method will wait for featureset resource to be ready or created with v1 version in :bash:`sync` state *Outputs* A dictionary object with response status """ assert ( type(featureset) == DkubeFeatureSet ), "Invalid type for run, value must be instance of rsrcs:DkubeFeatureset class" response = super().create_featureset(featureset) if response["code"] == 200 and featureset.featurespec_path is not None: spec_response = super().featureset_upload_featurespec( featureset.featureset.name, featureset.featurespec_path ) if spec_response["code"] != 200: self.delete_featureset(featureset.featureset.name) return spec_response while wait_for_completion: versions = super().get_featureset_versions(featureset.featureset.name) if versions is None: print("create_featureset: waiting for featureset to be setup") time.sleep(self.wait_interval) continue version_status = DKubeFeatureSetUtils().get_version_status(versions, "v1") if version_status.lower() == "synced": break print("create_featureset: waiting for featureset to be setup") time.sleep(self.wait_interval) return response
[docs] def delete_featuresets(self, featureset_list): """ Method to delete a list of featuresets on DKube. Raises Exception in case of errors. *Available in DKube Release: 2.2* *Inputs* featureset_list list of featureset names example: ["mnist-fs", "titanic-fs"] *Outputs* A dictionary object with response status with the list of deleted featureset names """ assert ( featureset_list and isinstance(featureset_list, list) and all(isinstance(featureset, str) for featureset in featureset_list) ), "Invalid parameter, value must be a list of featureset names" return super().delete_featureset(featureset_list)
[docs] def delete_featureset(self, name): """ Method to delete a a featureset. *Available in DKube Release: 2.2* *Inputs* name featureset name to be deleted. example: "mnist-fs" *Outputs* A dictionary object with response status and the deleted featureset name """ assert name and isinstance( name, str ), "Invalid parameter, value must be a featureset name" return super().delete_featureset([name])
[docs] def commit_featureset(self, **kwargs): """ Method to commit sticky featuresets. featureset should be in ready state. It will be in created state if no featurespec is uploaded. If the featureset is in created state, the following will happen. a) If metadata is passed, it will be uploaded as featurespec b) If no metadata is passed, it derives from df and uploads it. If the featureset is in ready state, the following will happen. a) metadata if passed any will be ignored b) featurespec will be downloaded for the specifed featureset and df is validated for conformance. If name is specified, it derives the path for committing the features. If path is also specified, it doesn't derive the path. It uses the specified path. However, path should a mount path into dkube store. If df is not specified, it assumes the df is already written to the featureset path. Features can be written to featureset mount path using DkubeFeatureSet.write_features *Available in DKube Release: 2.2* *Inputs* name featureset name or None example: name='fset' df Dataframe with features to be written None or empty df are invalid type: pandas.DataFrame metadata optional yaml object with name, description and schema fields or None example:metadata=[{'name':age, 'description:'', 'schema':int64}] path Mount path where featureset is mounted or None example: path='/opt/dkube/fset' *Outputs* Dictionary with response status """ name = kwargs.get("name", None) df = kwargs.get("df", None) metadata = kwargs.get("metadata", None) path = kwargs.get("path", None) merge = kwargs.get("merge", "True") dftype = kwargs.get("dftype", "Py") if not df is None: assert not df.empty, "df should not be empty" else: # Todo: Handle commit for featuresets mounted as k8s volumes assert name or path, "name or path should be specified" featurespec = None existing_spec = [] if name is not None: featurespec, valid = super().get_featurespec(name) assert valid, "featureset not found" if featurespec: existing_spec = featurespec if merge is False: existing_spec = [] if (dftype == "Py") and ( (len(existing_spec) != len(df.keys())) and (name is not None) and (df is not None) ): if not metadata: metadata = DKubeFeatureSetUtils().compute_features_metadata( df, existing_spec ) assert metadata, "The specified featureset is invalid" self.upload_featurespec(featureset=name, filepath=None, metadata=metadata) featurespec = metadata if (dftype == "Py") and (featurespec is not None) and (df is not None): isdf_valid = DKubeFeatureSetUtils().validate_features(df, featurespec) assert isdf_valid, "DataFrame validation failed" return super().commit_featureset(name, df, path, dftype)
[docs] def read_featureset(self, **kwargs): """ Method to read a featureset version. If name is specified, path is derived. If featureset is not mounted, a copy is made to user's homedir If path is specified, it should be a mounted path *Available in DKube Release: 2.2* *Inputs* name featureset to be read example: name='fset' or None version version to be read. If no version specified, latest version is assumed example: version='v2' or None path path where featureset is mounted. path='/opt/dkube/fset' or None *Outputs* Dataframe object """ name = kwargs.get("name", None) version = kwargs.get("version", None) path = kwargs.get("path", None) dftype = kwargs.get("dftype", "Py") assert (version == None) or isinstance(version, str), "version must be a string" return super().read_featureset(name, version, path, dftype)
[docs] def list_featuresets(self, query=None): """ Method to list featuresets based on query string. Raises Exception in case of errors. *Available in DKube Release: 2.2* *Inputs* query A query string that is compatible with Bleve search format *Outputs* A dictionary object with response status and the list of featuresets """ return super().list_featureset(query)
[docs] def upload_featurespec(self, featureset=None, filepath=None, metadata=None): """ Method to upload feature specification file. *Available in DKube Release: 2.2* *Inputs* featureset The name of featureset filepath Filepath for the feature specification metadata yaml file metadata feature specification in yaml object. One of filepath or metadata should be specified. *Outputs* A dictionary object with response status """ assert featureset and isinstance(featureset, str), "featureset must be string" assert bool(filepath) ^ bool( metadata ), "One of filepath and metadata should be specified" return super().featureset_upload_featurespec(featureset, filepath, metadata)
[docs] def get_featureset(self, featureset=None): """ Method to retrieve details of a featureset *Available in DKube Release: 2.2* *Inputs* featureset The name of featureset *Outputs* A dictionary object with response status, featureset metadata and feature versions """ return super().get_featureset(featureset)
[docs] def get_featurespec(self, featureset=None): """ Method to retrieve feature specification method. *Available in DKube Release: 2.2* *Inputs* featureset The name of featureset *Outputs* A dictionary object with response status and feature specification metadata """ return super().get_featurespec(featureset)
###############################################################
[docs] def create_dataset(self, dataset: DkubeDataset, wait_for_completion=True): """ Method to create a dataset on DKube. Raises Exception in case of errors. *Inputs* dataset Instance of :bash:`dkube.sdk.rsrcs.dataset` class. Please see the :bash:`Resources` section for details on this class. wait_for_completion When set to :bash:`True` this method will wait for dataset resource to get into one of the complete state. dataset is declared complete if it is one of the :bash:`complete/failed/error` state """ assert ( type(dataset) == DkubeDataset ), "Invalid type for run, value must be instance of rsrcs:DkubeDataset class" super().create_repo(dataset) if dataset.datum.remote == True: wait_for_completion = False while wait_for_completion: status = super().get_repo( "dataset", dataset.user, dataset.name, fields="status" ) state, reason = status["state"], status["reason"] if state.lower() in ["ready", "failed", "error"]: print( "dataset {} - completed with state {} and reason {}".format( dataset.name, state, reason ) ) break else: print( "dataset {} - waiting for completion, current state {}".format( dataset.name, state ) ) time.sleep(self.wait_interval)
[docs] def get_dataset(self, user, name): """ Method to fetch the dataset with given name for the given user. Raises exception in case of dataset is not found or any other connection errors. *Inputs* user User whose dataset has to be fetched. In case of if token is of different user, then the token should have permission to fetch the dataset of the :bash:`user` in the input. They should be in same DKube group. name Name of the dataset to be fetched """ return super().get_repo("dataset", user, name)
[docs] def list_datasets(self, user, shared=False, filters="*"): """ Method to list all the datasets of a user. Raises exception on any connection errors. *Inputs* user User whose datasets must be fetched. In case of if token is of different user, then the token should have permission to fetch the datasets of the :bash:`user` in the input. They should be in same DKube group. filters Only :bash:`*` is supported now. User will able to filter datasets based on state or the source """ return super().list_repos("dataset", user, shared)
[docs] def delete_dataset(self, user, name, force=False): """ Method to delete a dataset. Raises exception if token is of different user or if dataset with name doesnt exist or on any connection errors. *Inputs* user The token must belong to this user. As dataset of different user cannot be deleted. name Name of the dataset which needs to be deleted. """ super().delete_repo("dataset", user, name, force=force)
[docs] def create_model(self, model: DkubeModel, wait_for_completion=True): """ Method to create a model on DKube. Raises Exception in case of errors. *Inputs* model Instance of :bash:`dkube.sdk.rsrcs.model` class. Please see the :bash:`Resources` section for details on this class. wait_for_completion When set to :bash:`True` this method will wait for model resource to get into one of the complete state. model is declared complete if it is one of the :bash:`complete/failed/error` state """ assert ( type(model) == DkubeModel ), "Invalid type for run, value must be instance of rsrcs:DkubeModel class" super().create_repo(model) while wait_for_completion: status = super().get_repo("model", model.user, model.name, fields="status") state, reason = status["state"], status["reason"] if state.lower() in ["ready", "failed", "error"]: print( "model {} - completed with state {} and reason {}".format( model.name, state, reason ) ) break else: print( "model {} - waiting for completion, current state {}".format( model.name, state ) ) time.sleep(self.wait_interval)
[docs] def get_model(self, user, name, publish_details=False): """ Method to fetch the model with given name for the given user. Raises exception in case of model is not found or any other connection errors. *Inputs* user User whose model has to be fetched. In case of if token is of different user, then the token should have permission to fetch the model of the :bash:`user` in the input. They should be in same DKube group. name Name of the model to be fetched """ dkubever = self.dkubeinfo["version"] if ( pversion.parse(dkubever) < pversion.parse("2.3.0.0") ) or publish_details == False: return super().get_repo("model", user, name) else: modelObj = super().get_repo("model", user, name) versions = modelObj["versions"] for v in versions: if v["version"]["model"]["stage"] == "PUBLISHED": publish = super().get_model_catalog(user, name) modelObj["publish_details"] = publish return modelObj return modelObj
[docs] def list_models(self, user, shared=False, published=False, filters="*"): """ Method to list all the models of a user. Raises exception on any connection errors. *Inputs* user User whose models must be fetched. In case of if token is of different user, then the token should have permission to fetch the models of the :bash:`user` in the input. They should be in same DKube group. filters Only :bash:`*` is supported now. User will able to filter models based on state or the source published If Published is true, it will return all published models """ if published == True: return super().list_published_models(user) return super().list_repos("model", user, shared)
[docs] def delete_model(self, user, name, force=False): """ Method to delete a model. Raises exception if token is of different user or if model with name doesnt exist or on any connection errors. *Inputs* user The token must belong to this user. As model of different user cannot be deleted. name Name of the model which needs to be deleted. """ super().delete_repo("model", user, name, force=force)
[docs] def trigger_runs_bycode(self, code, user): """ Method to trigger all the runs in dkube which uses the mentioned code. *Inputs* code Name of the code. user Owner of the code. All runs of this user will be retriggered. """ condition = TriggerCondition(match="code", name=code, user=user) return super().trigger_runs(condition)
[docs] def trigger_runs_bydataset(self, dataset, user): """ Method to trigger all the runs in dkube which uses the mentioned dataset in input. *Inputs* dataset Name of the dataset. user Owner of the dataset. All runs of this user will be retriggered. """ condition = TriggerCondition(match="dataset", name=dataset, user=user) return super().trigger_runs(condition)
[docs] def trigger_runs_bymodel(self, model, user): """ Method to trigger all the runs in dkube which uses the mentioned model in input. *Inputs* model Name of the model. user Owner of the model. All runs of this user will be retriggered. """ condition = TriggerCondition(match="model", name=model, user=user) return super().trigger_runs(condition)
[docs] def get_model_lineage(self, user, name, version): """ Method to get lineage of a model version. *Inputs* name Name of the model version Version of the model user Owner of the model. """ return super().get_datum_lineage("model", user, name, version)
[docs] def get_dataset_lineage(self, user, name, version): """ Method to get lineage of a dataset version. *Inputs* name Name of the dataset version Version of the dataset user Owner of the dataset. """ return super().get_datum_lineage("dataset", user, name, version)
[docs] def get_training_run_lineage(self, user, name): """ Method to get lineage of a training run. *Inputs* name Name of the run user owner of the run """ # Get the training run info run = self.get_training_run(user, name) runid = run["job"]["parameters"]["generated"]["uuid"] return super().get_run_lineage("training", user, runid)
[docs] def get_preprocessing_run_lineage(self, user, name): """ Method to get lineage of a preprocessing run. *Inputs* name Name of the run user owner of the run """ # Get the preprocessing run info run = get_preprocessing_run(user, name) runid = run["job"]["parameters"]["generated"]["uuid"] return super().get_run_lineage("preprocessing", user, runid)
[docs] def get_model_versions(self, user, name): """ Method to get the versions of model. Versions are returned in ascending order. *Inputs* name Name of the model user owner of the model """ model = self.get_model(user, name) return model["versions"]
[docs] def get_model_latest_version(self, user, name): """ Method to get the latest version of the given model. *Inputs* name Name of the model user owner of the model """ versions = self.get_model_versions(user, name) return versions[0]["version"]
[docs] def get_model_version(self, user, name, version): """ Method to get details of a version of the given model. Raises `NotFoundException` if the version is not found *Inputs* name Name of the model version Version of the model user owner of the model """ versions = self.get_model_versions(user, name) for v in versions: if v["version"]["uuid"] == version: return v["version"] raise Exception("{}/{}/{} not found".format(user, name, version))
[docs] def get_dataset_versions(self, user, name): """ Method to get the versions of dataset. Versions are returned in ascending order. *Inputs* name Name of the dataset user owner of the dataset """ dataset = self.get_dataset(user, name) return dataset["versions"]
[docs] def get_dataset_latest_version(self, user, name): """ Method to get the latest version of the given dataset. *Inputs* name Name of the dataset user owner of the dataset """ versions = self.get_dataset_versions(user, name) return versions[0]["version"]
[docs] def get_dataset_version(self, user, name, version): """ Method to get details of a version of the given dataset. Raises `NotFoundException` if the version is not found *Inputs* name Name of the dataset version Version of the dataset user owner of the dataset """ versions = self.get_dataset_versions(user, name) for v in versions: if v["version"]["uuid"] == version: return v["version"] raise Exception("{}/{}/{} not found".format(user, name, version))
[docs] def get_datascience_capabilities(self): """ Method to get the datascience capabilities of the platform. Returns the supported frameworks, versions and the corresponding container image details. """ return super().get_datascience_capability()
[docs] def get_notebook_capabilities(self): """ Method to get the notebook capabilities of the platform. Returns the supported frameworks, versions and the image details. """ caps = self.get_datascience_capabilities() return caps["nb_ide"]["frameworks"]
[docs] def get_r_capabilities(self): """ Method to get the R language capabilities of the platform. Returns the supported frameworks, versions and the image details. """ caps = self.get_datascience_capabilities() return caps["r_ide"]["frameworks"]
[docs] def get_training_capabilities(self): """ Method to get the training capabilities of the platform. Returns the supported frameworks, versions and the image details. """ caps = self.get_datascience_capabilities() return caps["training"]["frameworks"]
[docs] def get_serving_capabilities(self): """ Method to get the serving capabilities of the platform. Returns the supported frameworks, versions and the image details. """ caps = self.get_datascience_capabilities() return caps["serving"]["frameworks"]
def list_frameworks(self): fw_opts = ["custom"] fws = self.get_training_capabilities() for fw in fws: for v in fw["versions"]: name = fw["name"] + "_" + v["name"] fw_opts.append(name) return json.dumps(fw_opts)
[docs] def release_model(self, user, model, version=None, wait_for_completion=True): """ Method to release a model to model catalog. Raises Exception in case of errors. *Available in DKube Release: 2.2* *Inputs* model Name with model. version Version of the model to be released. If not passed then latest version is released automatically. user Owner of the model. wait_for_completion When set to :bash:`True` this method will wait for publish to finish. Publishing is complete if stage of the mode is changed to :bash:`published/failed/error` """ if version == None: version = self.get_model_latest_version(user, model) version = version["uuid"] super().release_model(user, model, version) while wait_for_completion: v = self.get_model_version(user, model, version) stage = v["model"]["stage"] reason = v["model"]["reason"] if stage.lower() in ["released", "failed", "error"]: print( "release {}/{} - completed with state {} and reason {}".format( model, version, stage, reason ) ) break else: print( "release {}/{} - waiting for completion, current state {}".format( model, version, stage ) ) time.sleep(self.wait_interval)
[docs] def publish_model( self, name, description, details: DkubeServing, wait_for_completion=True ): """ Method to publish a model to model catalog. Raises Exception in case of errors. *Available in DKube Release: 2.2* *Inputs* name Name with which the model must be published in the model catalog. description Human readable text for the model being published details Instance of :bash:`dkube.sdk.rsrcs.serving` class. Please see the :bash:`Resources` section for details on this class. If serving image is not updated in :bash:`run:DkubeServing` argument then, - If training used supported standard framework, dkube will pick approp serving image - If training used custom image, dkube will try to use the same image for serving If transformer image is not updated in :bash:`run:DkubeServing` then, - Dkube will use same image as training image If transformer code is not updated in :bash:`run:DkubeServing` then, - Dkube will use the code used for training wait_for_completion When set to :bash:`True` this method will wait for publish to finish. Publishing is complete if stage of the mode is changed to :bash:`published/failed/error` """ run = details user, model, version = ( run.serving_def.owner, run.serving_def.model, run.serving_def.version, ) # Fetch training run details and fill in information for serving if ( run.predictor.image == None or (run.serving_def.transformer == True and run.transformer.image == None) or ( run.serving_def.transformer == True and run.serving_def.transformer_project == None ) ): if run.serving_def.version == None: v = self.get_model_latest_version( run.serving_def.owner, run.serving_def.model ) run.serving_def.version = v["uuid"] li = self.get_model_lineage( run.serving_def.owner, run.serving_def.model, run.serving_def.version ) if run.predictor.image == None: si = li["run"]["parameters"]["generated"]["serving_image"]["image"] run.update_serving_image( None, si["path"], si["username"], si["password"] ) if run.serving_def.transformer == True and run.transformer.image == None: ti = li["run"]["parameters"]["generated"]["training_image"]["image"] run.update_transformer_image(ti["path"], ti["username"], ti["password"]) if ( run.serving_def.transformer == True and run.serving_def.transformer_project == None ): code = li["run"]["parameters"]["training"]["datums"]["workspace"][ "data" ] cname = code["name"].split(":")[1] run.update_transformer_code(cname, code["version"]) data = {"name": name, "description": description, "serving": run.serving_def} super().publish_model(user, model, version, data) while wait_for_completion: v = self.get_model_version(user, model, version) stage = v["model"]["stage"] reason = v["model"]["reason"] if stage.lower() in ["published", "failed", "error"]: print( "publish {}/{} - completed with state {} and reason {}".format( model, version, stage, reason ) ) break else: print( "publish {}/{} - waiting for completion, current state {}".format( model, version, stage ) ) time.sleep(self.wait_interval)
[docs] def create_model_deployment( self, user, name, model, version, description=None, stage_or_deploy="stage", min_replicas=0, max_concurrent_requests=0, wait_for_completion=True, ): """ Method to create a serving deployment for a model in the model catalog. Raises Exception in case of errors. *Inputs* user Name of the user creating the deployment name Name of the deployment. Must be unique description User readable description of the deployment model Name of the model to be deployed version Version of the model to be deployed stage_or_deploy Default set to :bash: `stage` which means to stage the model deployment for testing before deploying it for production. Change to :bash: `deploy` to deploy the model in production min_replicas Minimum number of replicas that each Revision should have. If not prvided, uses value set in platform config map. max_concurrent_requests Soft limit that specifies the maximum number of requests an inf pod can process at a time. If not prvided, uses value set in platform config map. wait_for_completion When set to :bash:`True` this method will wait for job to complete after submission. Job is declared complete if it is one of the :bash:`complete/failed/error` state """ assert stage_or_deploy in [ "stage", "deploy", ], "Invalid value for stage_or_deploy parameter." run = DkubeServing(user, name=name, description=description) run.update_serving_model(model, version=version) run.update_autoscaling_config(min_replicas, max_concurrent_requests) dkubever = self.dkubeinfo["version"] if pversion.parse(dkubever) < pversion.parse("2.3.0.0"): mcitem = self.get_modelcatalog_item( user, modelcatalog=model, version=version ) run.update_serving_image( image_url=mcitem["serving"]["images"]["serving"]["image"]["path"] ) else: mcitem = super().get_model_catalog(user, model) versions = mcitem["versions"] for v in versions: if v["version"] == version: serving_image = v["serving"]["images"]["serving"]["image"]["path"] run.update_serving_image(image_url=serving_image) if stage_or_deploy == "stage": super().stage_model(run) if stage_or_deploy == "deploy": super().deploy_model(run) while wait_for_completion: status = super().get_run("inference", run.user, run.name, fields="status") state, reason = status["state"], status["reason"] if state.lower() in ["complete", "failed", "error", "running"]: print( "run {} - completed with state {} and reason {}".format( run.name, state, reason ) ) break else: print( "run {} - waiting for completion, current state {}".format( run.name, state ) ) time.sleep(self.wait_interval)
[docs] def delete_model_deployment(self, user, name, wait_for_completion=True): """ Method to delete a model deployment. Raises exception if token is of different user or if serving run with name doesnt exist or on any connection errors. *Inputs* user The token must belong to this user. As run of different user cannot be deleted. name Name of the run which needs to be deleted. wait_for_completion When set to :bash:`True` this method will wait for deployment to get deleted. """ data = super().get_run("inference", user, name, fields="*") uuid = data["job"]["parameters"]["generated"]["uuid"] ret = super().delete_run("inference", user, name) if wait_for_completion: self._wait_for_rundelete_completion(uuid, "inference", name) return ret
[docs] def list_model_deployments(self, user, shared=False, filters="*"): """ Method to list all the model deployments. Raises exception on any connection errors. *Inputs* user Name of the user. filters Only :bash:`*` is supported now. User will able to filter runs based on state or duration """ deps = [] resp = super().list_runs("inference", user, shared) for item in resp: for inf in item["jobs"]: deploy = inf["parameters"]["inference"]["deploy"] # MAK - BUG - there is no way today from backend response to separate the test-inferences # vs serving deployments. So appending all. deps.append(inf) return deps
[docs] def modelcatalog(self, user): """ Method to fetch the model catalog from DKube. Model catalog is list of models published by datascientists and are ready for staging or deployment on a production cluster. The user must have permission to fetch the model catalog. *Available in DKube Release: 2.2* *Inputs* user Name of the user. """ return super().modelcatalog(user)
[docs] def get_modelcatalog_item(self, user, modelcatalog=None, model=None, version=None): """ Method to get an item from modelcatalog Raises exception on any connection errors. *Available in DKube Release: 2.2* *Inputs* user Name of the user. modelcatalog Model catalog name model Name of the model catalog version Version of the model """ if modelcatalog is None and model is None: return "either model catalog name or model name should be provided" if version is None: return "Model Version must be provided" if modelcatalog: mc = self.modelcatalog(user) for item in mc: if item["name"] == modelcatalog: for iversion in item["versions"]: if iversion["model"]["version"] == version: return iversion raise Exception("{}.{} not found in model catalog".format(model, version)) else: mc = self.modelcatalog(user) for item in mc: if item["model"]["name"] == model: for iversion in item["versions"]: if iversion["model"]["version"] == version: return iversion raise Exception("{}.{} not found in model catalog".format(model, version))
[docs] def delete_modelcatalog_item( self, user, modelcatalog=None, model=None, version=None ): """ Method to delete an item from modelcatalog Raises exception on any connection errors. *Available in DKube Release: 2.2* *Inputs* user Name of the user. modelcatalog Model catalog name model Name of the model catalog version Version of the model """ if modelcatalog is None and model is None: return "either model catalog name or model name should be provided" if version is None: return "Model Version must be provided" if modelcatalog: response = self._api.delete_model_catalog_item(user, modelcatalog, version) return response else: mc = self.modelcatalog(user) for item in mc: if item["model"]["name"] == model: modelcatalog = item["name"] response = self._api.delete_model_catalog_item( user, modelcatalog, version ) return response raise Exception("{}.{} not found in model catalog".format(model, version))
[docs] def list_projects(self): """ Return list of DKube projects. *Available in DKube Release: 2.2* """ response = self._api.get_all_projects().to_dict() assert response["response"]["code"] == 200, response["response"]["message"] return response["data"]
[docs] def create_project(self, project: DkubeProject): """Creates DKube Project. *Available in DKube Release: 2.2* *Inputs* project instance of :bash:`dkube.sdk.rsrcs.DkubeProject` class. """ assert ( type(project) == DkubeProject ), "Invalid type for project, value must be instance of rsrcs:DkubeProject class" response = self._api.create_project(project).to_dict() assert response["response"]["code"] == 200, response["response"]["message"] return response["data"]
[docs] def update_project(self, project_id, project: DkubeProject): """Update project details. *Available in DKube Release: 2.2* Note: details and evail_details fields are base64 encoded. *Inputs* project_id id of the project project instance of :bash:`dkube.sdk.rsrcs.DkubeProject` class. """ assert ( type(project) == DkubeProject ), "Invalid type for project, value must be instance of rsrcs:DkubeProject class" project.id = project_id response = self._api.update_one_project( project_id=project.id, data=project ).to_dict() assert response["code"] == 200, response["message"]
[docs] def get_project_id(self, name): """ "Get project id from project name. *Available in DKube Release: 2.2* *Inputs* name name of the project """ response = self._api.get_all_projects().to_dict() assert response["response"]["code"] == 200, response["response"]["message"] for project in response["data"]: if project["name"] == name: return project["id"] return None
[docs] def get_project(self, project_id): """Get project details. *Available in DKube Release: 2.2* *Inputs* project_id id of the project """ response = self._api.get_one_project(project_id).to_dict() assert response["response"]["code"] == 200, response["response"]["message"] return response["data"]
[docs] def get_leaderboard(self, project_id): """Get project's leaderboard details. *Available in DKube Release: 2.2* *Inputs* project_id id of the project """ response = self._api.get_all_project_submissions(project_id).to_dict() assert response["response"]["code"] == 200, response["response"]["message"] return response["data"]
[docs] def delete_project(self, project_id): """Delete project. This only deletes the project and not the associated resources. *Available in DKube Release: 2.2* *Inputs* project_id id of the project """ project_ids = {"project_ids": [project_id]} response = self._api.projects_delete_list(project_ids).to_dict() assert response["code"] == 200, response["message"]
[docs] def upload_model( self, user, name, filepath, extract=False, wait_for_completion=True ): """Upload model. This creates a model and uploads the file residing in your local workstation. Supported formats are tar, gz, tar.gz, tgz, zip, csv and txt. *Available in DKube Release: 2.2* *Inputs* user name of user under which model is to be created in dkube. name name of model to be created in dkube. filepath path of the file to be uploaded extract if extract is set to True, the file will be extracted after upload. wait_for_completion When set to :bash:`True` this method will wait for model resource to get into one of the complete state. model is declared complete if it is one of the :bash:`complete/failed/error` state """ upl_resp = super().upload_model(user, name, filepath, extract=extract) while wait_for_completion: status = super().get_repo("model", user, name, fields="status") state, reason = status["state"], status["reason"] if state.lower() in ["ready", "failed", "error"]: print( "model {} - completed with state {} and reason {}".format( name, state, reason ) ) break else: print( "model {} - waiting for completion, current state {}".format( name, state ) ) time.sleep(self.wait_interval)
[docs] def download_dataset(self, path, user, name, version=None): """This method is to download a version of dataset. Downloaded content will be copied in the specified path. *Inputs* path Target path where the dataset must be downloaded. user name of user who owns the dataset. name name of dataset. version version of the dataset. """ if version == None: version = self.get_dataset_latest_version(user, name) version = version["uuid"] super().download_dataset(path, user, name, version)
[docs] def download_model(self, path, user, name, version=None): """This method is to download a version of model. Downloaded content will be copied in the specified path. *Inputs* path Target path where the dataset must be downloaded. user name of user who owns the dataset. name name of dataset. version version of the dataset. """ if version == None: version = self.get_model_latest_version(user, name) version = version["uuid"] super().download_model(path, user, name, version)
def _wait_for_rundelete_completion(self, uuid, _class, name): dkubever = self.dkubeinfo["version"] # MAK - ideally the target version should be 2.2.7.0 and it should # be sufficient to check for older release(s), but there is an internal patch to enable # automation suite which returns release version as 2.2.1.13 if pversion.parse(dkubever) < pversion.parse("2.2.1.13"): # Older release - waiting for deletion to complete not supported return try: # MAK - try block can be removed, once 2.2.7.0 is released while True: data = super().get_run_byuuid(uuid) state = data["parameters"]["generated"]["status"]["sub_state"] if state == None: # MAK - check can be removed once 2.2.7.0 is released # Older release - ignore the error break if state.lower() == "deleting": print( "{} {} - waiting for deletion, current state {}".format( _class, name, state ) ) time.sleep(self.wait_interval) elif state.lower() == "deleted": print("{} {} - deleted successfully".format(_class, name)) break except ApiException as ae: if ae.status == 404: # Older release - ignore the error print( "ignoring 404 - fetching deleted jobs failed, older release of dkube" ) else: raise ae
[docs] def list_inference_endpoints(self): """ Method to list all the inferences in the dkube cluster. Raises exception on any connection errors. """ return super().list_inference_endpoints()
[docs] def list_cicd_images(self, repo=None): """ Method to list all the CICD images + Any images manually added in DKube. *Inputs* repo Git repo URL. If provided, only returns images generated for that repo """ response = self._api.get_all_cicd_images().to_dict() assert response["response"]["code"] == 200, response["response"]["message"] if repo is None: return response["data"] images = [] for entry in response["data"]: if "repo" in entry["image"] and entry["image"]["repo"] == repo: images.append(entry) return images
### Model monitor apis ##########
[docs] def modelmonitor_create( self, modelmonitor: DkubeModelmonitor, wait_for_completion=True ): """ Method to create Model Monitor on Dkube *Available in DKube Release: 3.0* *Inputs* modelmonitor Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitor class. Please see the :bash:`Resources` section for details on this class. wait_for_completion When set to :bash:`True` this method will wait for modelmonitor resource to get into one of the complete state. modelmonitor is declared complete if it is one of the :bash:`init/ready/error` state *Outputs* a dictionary object with response status """ assert ( type(modelmonitor) == DkubeModelmonitor ), "Invalid type for model monitor, value must be instance of rsrcs:DkubeModelmonitor class" response = super().create_model_monitor(modelmonitor) while wait_for_completion: mm_config = super().get_modelmonitor_configuration(response["uuid"]) state = mm_config["status"]["state"] if state.lower() in ["init", "ready", "error"]: print( "ModelMonitor {} - completed with state {} and reason {}".format( modelmonitor.name, state, response["message"] ) ) break else: print( "ModelMonitor {} - waiting for completion, current state {}".format( modelmonitor.name, state ) ) time.sleep(self.wait_interval) return response
[docs] def modelmonitor_list(self, **kwargs): """ Method to list the modelmonitors. *Available in DKube Release: 3.0* *Inputs* tags string page integer archived boolean, when archived=True, list the archived modelmonitors *Outputs* A list containing the modelmonitors """ tags = kwargs.get("tags") page = kwargs.get("page") archived = kwargs.get("archived", False) query_params = {} if tags: query_params["tags"] = tags if page: query_params["page"] = page if archived: query_params["archived"] = archived return super().list_modelmonitor(query_params)
[docs] def modelmonitor_get_id(self, name=None): """ Method to get the id of a model monitor. *Available in DKube Release: 3.0* *Inputs* name Name of the modelmonitor *Outputs* An uuid of the modelmonitor """ response = super().get_modelmonitor_id(name).to_dict()["data"] if response != None: return response.get(name) else: return None
[docs] def modelmonitor_get_alertid(self, id=None, alert_name=None): """ Method to get the alert id of a modelmonitor. *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id alert_name Name of the alert *Outputs* an id of the alert """ response = super().get_modelmonitor_alerts(id) for alert in response: if alert["name"] == alert_name: return alert["id"] return None
[docs] def modelmonitor_get(self, id=None): """ Method to get the modelmonitor. *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id *Outputs* A dictionary containing the configuration of the modelmonitor """ return super().get_modelmonitor_configuration(id)
[docs] def modelmonitor_get_datasets(self, id=None, data_class: DatasetClass = None): """ Method to get the datasets of the modelmonitor. *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id data_class data class of the dataset in the modelmonitor must be one of ["TrainData","PredictData","LabelledData"] by default set to None *Outputs* if data_class is None: A list of dictionaries containing all the datasets information. if data_class is 'PredictData' or 'LabelledData' or 'TrainData': An individual data dictionary for that data class. """ datasets = super().get_modelmonitor_dataset(id) if data_class == None: return datasets else: for data in datasets: if data["_class"] == data_class: return data
[docs] def modelmonitor_get_alerts(self, id=None): """ Method to get the alerts of the modelmonitor. *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id *Outputs* a list of dictionaries containing individual alerts information """ return super().get_modelmonitor_alerts(id)
[docs] def modelmonitors_delete(self, ids=[]): """ Method to delete the multiple modelmonitors. *Available in DKube Release: 3.0* *Inputs* ids List of modelmonitor Ids to be deleted. Example: ["cd123","345fg"] *Outputs* A dictionary object with response status """ return super().delete_modelmonitors(ids)
[docs] def modelmonitor_delete(self, id=None): """ Method to delete the single modelmonitor. *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id *Outputs* A dictionary object with response status """ return super().delete_modelmonitors([id])
[docs] def modelmonitor_get_metricstemplate(self): """ Method to get the metrics supported for the modelmonitor. *Available in DKube Release: 3.0* Outputs* a list of dictionaries containing metrics template for Regression and Classification """ return super().get_modelmonitor_template()
[docs] def modelmonitor_delete_alert(self, id=None, alert_id=None): """ Method to delete the alerts in the modelmonitor *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id alert_id Id of a modelmonitor alert """ delete_alertsid_list = [] delete_alertsid_list.append(alert_id) return super().delete_modelmonitor_alert(id, delete_alertsid_list)
[docs] def modelmonitor_add_alert( self, id=None, alert_data: DkubeModelmonitoralert = None ): """ Method to add the alerts in the modelmonitor *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id alert_data Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitoralert` class. Please see the :bash:`Resources` section for details on this class. Outputs* a dictionary object with response status """ if self.modelmonitor_get_datasets( id=id, data_class="TrainData" ) and self.modelmonitor_get_datasets(id=id, data_class="PredictData"): alert_dict = json.loads(alert_data.to_JSON()) alert_dict["class"] = alert_dict.pop("_class") response = super().modelmonitor_addalert(id, {"data": [alert_dict]}) return response else: print("Add train and predict data before adding alerts") return None
[docs] def modelmonitor_add_dataset( self, id=None, data: DkubeModelmonitordataset = None, wait_for_completion=True ): """ Method to add the dataset in the modelmonitor *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id data Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitordataset` class. Please see the :bash:`Resources` section for details on this class. wait_for_completion When set to :bash:`True` this method will wait for modelmonitor resource to get into one of the complete state and then add the datasets. modelmonitor is declared complete if it is one of the :bash:`init/ready/error` state Outputs* a dictionary object with response status """ data_dict = json.loads(data.to_JSON()) data_dict["class"] = data_dict.pop("_class") response = super().modelmonitor_adddataset(id, {"data": data_dict}) while wait_for_completion: mm_config = super().get_modelmonitor_configuration(id) state = mm_config["status"]["state"] if state.lower() in ["init", "ready", "error"]: break else: print( "Modelmonitor add dataset not completed yet, current state {}".format( state ) ) time.sleep(self.wait_interval) return response["response"]
[docs] def modelmonitor_archive(self, id=None): """ Method to archive the modelmonitor *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id Outputs* a dictionary object with response status """ return super().modelmonitor_archive(id, archive=True)
[docs] def modelmonitor_unarchive(self, id=None): """ Method to unarchive the modelmonitor *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id Outputs* a dictionary object with response status """ return super().modelmonitor_archive(id, archive=False)
[docs] def modelmonitor_start(self, id=None, wait_for_completion=True): """ Method to start the modelmonitor *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id wait_for_completion When set to :bash:`True` this method will wait for modelmonitor resource to get into one of the complete state. modelmonitor is declared complete if it is one of the :bash:`init/ready/error` state , when it reaches ready state, it starts the modelmonitor Outputs* a dictionary object with response status """ response = super().modelmonitor_state(id, "start") while wait_for_completion: mm_state = self.modelmonitor_get(id=id)["status"]["state"] if mm_state.lower() in ["init", "active", "error"]: break else: print("ModelMonitor {} - is in {} state".format(id, mm_state)) time.sleep(self.wait_interval) return response
[docs] def modelmonitor_stop(self, id=None): """ Method to stop the modelmonitor *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id Outputs* a dictionary object with response status """ return super().modelmonitor_state(id, "stop")
[docs] def modelmonitor_update_dataset( self, id=None, data_class: DatasetClass = None, data: DkubeModelmonitordataset = None, wait_for_completion=True, ): """ Method to update the modelmonitor dataset *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id data Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitordataset` class. Please see the :bash:`Resources` section for details on this class. data_class Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DatasetClass` class. Enum = ["TrainData","PredictData","LabelledData"] Please see the :bash:`Resources` section for details on this class. wait_for_completion When set to :bash:`True` this method will wait for modelmonitor resource to get into one of the complete state and then update the datasets modelmonitor is declared complete if it is one of the :bash:`init/ready/error` state , if it is in active state, modelmonitor update to datasets not allowed Outputs* a dictionary object with response status """ data_id = self.modelmonitor_get_datasets(id, data_class=data_class)["id"] data_dict = json.loads(data.to_JSON()) for k in list(data_dict.keys()): if data_dict[k] == None and k != "_class": del data_dict[k] data_dict["class"] = data_dict["_class"] if data_dict["class"] == None: data_dict["class"] = data_class response = super().update_modelmonitor_dataset(id, data_id, data_dict) while wait_for_completion: mm_state = self.modelmonitor_get(id=id)["status"]["state"] if mm_state.lower() in ["init", "error", "ready"]: break else: print("ModelMonitor {} - is in {} state".format(id, mm_state)) time.sleep(self.wait_interval) return response
[docs] def modelmonitor_update_alert( self, id=None, alert: DkubeModelmonitoralert = None, alert_id=None ): """ Method to update the modelmonitor alert *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id data Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitoralert` class. Please see the :bash:`Resources` section for details on this class. alert_id ID of the alert you want to update in the modelmonitor Outputs* a dictionary object with response status """ alert_dict = json.loads(alert.to_JSON()) alert_dict["class"] = alert_dict.pop("_class") return super().update_modelmonitor_alert(id, alert_id, alert_dict)
[docs] def modelmonitor_update( self, id=None, config: DkubeModelmonitor = None, wait_for_completion=True ): """ Method to update the modelmonitor *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id config Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitor` class. Please see the :bash:`Resources` section for details on this class to check what can be updated. Outputs* a dictionary object with response status """ config_dict = config.__dict__["modelmonitor"].__dict__ config_dict = {k.replace("_", "", 1): v for k, v in config_dict.items()} rem_list = [ "datasets", "model", "alerts", "performance_metrics_template", "updated_at", "id", "drift_detection_algorithm", "created_at", "pipeline_component", "status", "owner", "name", "discriminator", ] [config_dict.pop(key) for key in rem_list] for k in list(config_dict.keys()): if config_dict[k] == None or config_dict[k] == []: del config_dict[k] response = super().update_modelmonitor_config(id, config_dict) while wait_for_completion: mm_state = self.modelmonitor_get(id=id)["status"]["state"] if mm_state.lower() in ["init", "error", "ready"]: break else: print("ModelMonitor {} - is in {} state".format(id, mm_state)) time.sleep(self.wait_interval) return response
[docs] def modelmonitor_update_schema( self, id=None, label=None, selected=True, schema_class="Categorical", schema_type="InputFeature", ): """ Method to update the schema in the modelmonitor *Available in DKube Release: 3.0* *Inputs* id Modelmonitor Id label feature in the schema to be updated selected boolean (True or False), by default True schema_class class of the schema (Categorical,Continuous) schema_type type of schema (Input Feature, PredictionOutput, Rowid, Timestamp) Outputs* a dictionary object with response status """ if not label: print("Specify a valid column label") return None found = False config = self.modelmonitor_get(id=id) try: for feature in config["schema"]["features"]: if feature["label"] == label: feature["_class"] = schema_class feature["type"] = schema_type feature["selected"] = selected found = True if not found: print("specified label is not in the derived schema") return None for d in config["schema"]["features"]: d["class"] = d.pop("_class") mm = DkubeModelmonitor( model_name=config["model"], description=config["description"] ) mm.__dict__["modelmonitor"].__dict__["_schema"] = config["schema"] return self.modelmonitor_update(id,mm) except TypeError: print("Schema is Null") return