"""
.. module:: DKubeAPI
:synopsis: Helper class which provides high level methods for user to integrate at workflow level.
.. moduleauthor:: Ahmed Khan <github.com/mak-454>
"""
import json
import os
import time
import pandas as pd
import urllib3
from dkube.sdk.internal.api_base import *
from dkube.sdk.internal.dkube_api.models.conditions import \
Conditions as TriggerCondition
from dkube.sdk.internal.dkube_api.rest import ApiException
from dkube.sdk.internal.files_base import *
from dkube.sdk.rsrcs import *
from dkube.sdk.rsrcs.featureset import DkubeFeatureSet, DKubeFeatureSetUtils
from dkube.sdk.rsrcs.modelmonitor import (DatasetClass, DkubeModelmonitoralert,
DkubeModelmonitordataset)
from dkube.sdk.rsrcs.project import DkubeProject
from packaging import version as pversion
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
[docs]class DkubeApi(ApiBase, FilesBase):
"""
This class encapsules all the high level dkube workflow functions.::
from dkube.sdk import *
dapi = DkubeApi()
*Inputs*
URL
FQDN endpoint at which DKube platform is deployed::
http://dkube-controller-master.dkube.cluster.local:5000
https://dkube.ai:32222
.. note:: If not provided then the value is picked from *DKUBE_ACCESS_URL* env variable. If not found then http://dkube-controller-master.dkube.cluster.local:5000 is used assuming the access is internal to the DKube cluster
token
Access token for the APIs, without which DKube will return 40x codes
.. note:: If not provided then the value is picked from *DKUBE_ACCESS_TOKEN* env variable. ASSERTs if env is not defined.
common_tags
Tags which need to applied all the resources created using this API object
req_timeout
Timeout for all the requests which are issued using this API object
req_retries
Number of retries per request
"""
def __init__(
self, URL=None, token=None, common_tags=[], req_timeout=None, req_retries=None
):
self.url = URL
if self.url is None:
self.url = os.getenv(
"DKUBE_ACCESS_URL",
"http://dkube-controller-master.dkube.svc.cluster.local:5000",
)
self.files_url = os.getenv(
"DKUBE_ACCESS_URL",
"http://dkube-downloader.dkube.svc.cluster.local:9401",
)
else:
self.files_url = self.url
self.token = token
if self.token == None:
self.token = os.getenv("DKUBE_ACCESS_TOKEN", None)
assert (
self.token == None
), "TOKEN must be specified either by passing argument or by setting DKUBE_ACCESS_TOKEN env variable"
ApiBase.__init__(self, self.url, self.token, common_tags)
FilesBase.__init__(self, self.files_url, self.token)
self.dkubeinfo = super().dkubeinfo()
[docs] def set_active_project(self, project_id):
"""
Set active project. Any resources created using this API instance will belong to the given project.
*Available in DKube Release: 2.2*
*Inputs*
project_id
ID of the project. pass None to unset.
"""
self.common_tags = [
tag for tag in self.common_tags if not tag.startswith("project:")
]
if project_id:
self.common_tags.append("project:" + str(project_id))
[docs] def validate_token(self):
"""
Method which can be used to validate the token.
Returns the JWT Claims. Which contains the role assigned to the user.
"""
return super().validate_token()
[docs] def launch_jupyter_ide(self, ide: DkubeIDE, wait_for_completion=True):
"""
Method to launch a Jupyter IDE on DKube platform. Two kinds of IDE are supported,
Jupyter Notebook & RStudio.
Raises Exception in case of errors.
*Inputs*
ide
Instance of :bash:`dkube.sdk.rsrcs.DkubeIDE` class.
Please see the :bash:`Resources` section for details on this class.
wait_for_completion
When set to :bash:`True` this method will wait for job to complete after submission.
IDE is declared complete if it is one of the :bash:`running/failed/error` state
"""
assert (
type(ide) == DkubeIDE
), "Invalid type for run, value must be instance of rsrcs:DkubeIDE class"
super().launch_jupyter_ide(ide)
while wait_for_completion:
status = super().get_ide("notebook", ide.user, ide.name, fields="status")
state, reason = status["state"], status["reason"]
if state.lower() in ["running", "failed", "error"]:
print(
"IDE {} - completed with state {} and reason {}".format(
ide.name, state, reason
)
)
break
else:
print(
"IDE {} - waiting for completion, current state {}".format(
ide.name, state
)
)
time.sleep(self.wait_interval)
[docs] def launch_rstudio_ide(self, ide: DkubeIDE, wait_for_completion=True):
"""
Method to launch a Rstudio IDE on DKube platform. Two kinds of IDE are supported,
Jupyter Notebook & RStudio.
Raises Exception in case of errors.
*Inputs*
ide
Instance of :bash:`dkube.sdk.rsrcs.DkubeIDE` class.
Please see the :bash:`Resources` section for details on this class.
wait_for_completion
When set to :bash:`True` this method will wait for job to complete after submission.
IDE is declared complete if it is one of the :bash:`running/failed/error` state
"""
assert (
type(ide) == DkubeIDE
), "Invalid type for run, value must be instance of rsrcs:DkubeIDE class"
super().launch_rstudio_ide(ide)
while wait_for_completion:
status = super().get_ide("notebook", ide.user, ide.name, fields="status")
state, reason = status["state"], status["reason"]
if state.lower() in ["running", "failed", "error"]:
print(
"IDE {} - completed with state {} and reason {}".format(
ide.name, state, reason
)
)
break
else:
print(
"IDE {} - waiting for completion, current state {}".format(
ide.name, state
)
)
time.sleep(self.wait_interval)
[docs] def list_ides(self, user, shared=False, filters="*"):
"""
Method to list all the IDEs of a user.
Raises exception on any connection errors.
*Inputs*
user
User whose IDE instances must be fetched.
In case of if token is of different user, then the token should have permission to fetch the
training runs of the :bash:`user` in the input. They should be in same DKube group.
filters
Only :bash:`*` is supported now.
User will able to filter runs based on state or duration
"""
return super().list_ides("notebook", user, shared)
[docs] def delete_ide(self, user, name, wait_for_completion=True):
"""
Method to delete an IDE.
Raises exception if token is of different user or if IDE with name doesnt exist or on any connection errors.
*Inputs*
user
The token must belong to this user. As IDE instance of different user cannot be deleted.
name
Name of the IDE which needs to be deleted.
wait_for_completion
When set to :bash:`True` this method will wait for ide to get deleted.
"""
data = super().get_ide("notebook", user, name, fields="*")
uuid = data["job"]["parameters"]["generated"]["uuid"]
ret = super().delete_ide("notebook", user, name)
if wait_for_completion:
self._wait_for_rundelete_completion(uuid, "notebook", name)
return ret
[docs] def create_training_run(self, run: DkubeTraining, wait_for_completion=True):
"""
Method to create a training run on DKube.
Raises Exception in case of errors.
*Inputs*
run
Instance of :bash:`dkube.sdk.rsrcs.Training` class.
Please see the :bash:`Resources` section for details on this class.
wait_for_completion
When set to :bash:`True` this method will wait for job to complete after submission.
Job is declared complete if it is one of the :bash:`complete/failed/error` state
"""
assert (
type(run) == DkubeTraining
), "Invalid type for run, value must be instance of rsrcs:DkubeTraining class"
valid_fw = False
fw_opts = ["custom"]
if run.executor_dkube_framework.choice == "custom":
valid_fw = True
else:
fws = self.get_training_capabilities()
for fw in fws:
for v in fw["versions"]:
if (
run.executor_dkube_framework.choice == fw["name"]
and run.dkube_framework_details.version == v["name"]
):
valid_fw = True
break
else:
name = fw["name"] + "_" + v["name"]
fw_opts.append(name)
if valid_fw == True:
break
assert valid_fw == True, (
"Invalid choice for framework, select oneof(" + str(fw_opts) + ")"
)
super().update_tags(run.training_def)
super().create_run(run)
while wait_for_completion:
status = {}
try:
status = super().get_run(
"training", run.user, run.name, fields="status"
)
except ValueError as ve:
ve_without_num = "".join(i for i in str(ve) if not i.isdigit())
if "Invalid value for `state` (Waiting for gpu(s))" in ve_without_num:
num = "".join(i for i in str(ve) if i.isdigit())
status["state"] = "Waiting for {} gpu(s)".format(num)
status["reason"] = ""
else:
raise ve
state, reason = status["state"], status["reason"]
if state.lower() in ["complete", "failed", "error", "stopped", "created"]:
print(
"run {} - completed with state {} and reason {}".format(
run.name, state, reason
)
)
break
else:
print(
"run {} - waiting for completion, current state {}".format(
run.name, state
)
)
time.sleep(self.wait_interval)
[docs] def get_training_run(self, user, name):
"""
Method to fetch the training run with given name for the given user.
Raises exception in case of run is not found or any other connection errors.
*Inputs*
user
User whose training run has to be fetched.
In case of if token is of different user, then the token should have permission to fetch the
training run of the :bash:`user` in the input. They should be in same DKube group.
name
Name of the training run to be fetched
"""
return super().get_run("training", user, name)
[docs] def list_training_runs(self, user, shared=False, filters="*"):
"""
Method to list all the training runs of a user.
Raises exception on any connection errors.
*Inputs*
user
User whose training runs must be fetched.
In case of if token is of different user, then the token should have permission to fetch the
training runs of the :bash:`user` in the input. They should be in same DKube group.
filters
Only :bash:`*` is supported now.
User will able to filter runs based on state or duration
"""
return super().list_runs("training", user, shared)
[docs] def delete_training_run(self, user, name, wait_for_completion=True):
"""
Method to delete a run.
Raises exception if token is of different user or if training run with name doesnt exist or on any connection errors.
*Inputs*
user
The token must belong to this user. As run of different user cannot be deleted.
name
Name of the run which needs to be deleted.
wait_for_completion
When set to :bash:`True` this method will wait for training run to get deleted.
"""
data = super().get_run("training", user, name, fields="*")
uuid = data["job"]["parameters"]["generated"]["uuid"]
ret = super().delete_run("training", user, name)
if wait_for_completion:
self._wait_for_rundelete_completion(uuid, "training", name)
return ret
[docs] def create_preprocessing_run(
self, run: DkubePreprocessing, wait_for_completion=True
):
"""
Method to create a preprocessing run on DKube.
Raises Exception in case of errors.
*Inputs*
run
Instance of :bash:`dkube.sdk.rsrcs.Preprocessing` class.
Please see the :bash:`Resources` section for details on this class.
wait_for_completion
When set to :bash:`True` this method will wait for job to complete after submission.
Job is declared complete if it is one of the :bash:`complete/failed/error` state
"""
assert (
type(run) == DkubePreprocessing
), "Invalid type for run, value must be instance of rsrcs:DkubePreprocessing class"
super().update_tags(run.pp_def)
super().create_run(run)
while wait_for_completion:
status = {}
try:
status = super().get_run(
"preprocessing", run.user, run.name, fields="status"
)
except ValueError as ve:
ve_without_num = "".join(i for i in str(ve) if not i.isdigit())
if "Invalid value for `state` (Waiting for gpu(s))" in ve_without_num:
num = "".join(i for i in str(ve) if i.isdigit())
status["state"] = "Waiting for {} gpu(s)".format(num)
status["reason"] = ""
else:
raise ve
state, reason = status["state"], status["reason"]
if state.lower() in ["complete", "failed", "error", "stopped"]:
print(
"run {} - completed with state {} and reason {}".format(
run.name, state, reason
)
)
break
else:
print(
"run {} - waiting for completion, current state {}".format(
run.name, state
)
)
time.sleep(self.wait_interval)
[docs] def get_preprocessing_run(self, user, name):
"""
Method to fetch the preprocessing run with given name for the given user.
Raises exception in case of run is not found or any other connection errors.
*Inputs*
user
User whose preprocessing run has to be fetched.
In case of if token is of different user, then the token should have permission to fetch the
preprocessing run of the :bash:`user` in the input. They should be in same DKube group.
name
Name of the training run to be fetched
"""
return super().get_run("preprocessing", user, name)
[docs] def list_preprocessing_runs(self, user, shared=False, filters="*"):
"""
Method to list all the preprocessing runs of a user.
Raises exception on any connection errors.
*Inputs*
user
User whose preprocessing runs must be fetched.
In case of if token is of different user, then the token should have permission to fetch the
preprocessing runs of the :bash:`user` in the input. They should be in same DKube group.
filters
Only :bash:`*` is supported now.
User will able to filter runs based on state or duration
"""
return super().list_runs("preprocessing", user, shared)
[docs] def delete_preprocessing_run(self, user, name, wait_for_completion=True):
"""
Method to delete a run.
Raises exception if token is of different user or if preprocessing run with name doesnt exist or on any connection errors.
*Inputs*
user
The token must belong to this user. As run of different user cannot be deleted.
name
Name of the run which needs to be deleted.
wait_for_completion
When set to :bash:`True` this method will wait for preprocess run to get deleted.
"""
data = super().get_run("preprocessing", user, name, fields="*")
uuid = data["job"]["parameters"]["generated"]["uuid"]
ret = super().delete_run("preprocessing", user, name)
if wait_for_completion:
self._wait_for_rundelete_completion(uuid, "preprocessing", name)
return ret
[docs] def update_inference(self, run: DkubeServing, wait_for_completion=True):
"""
Method to update a test inference/deployment in DKube.
Raises Exception in case of errors.
*Inputs*
run
Instance of :bash:`dkube.sdk.rsrcs.serving` class.
Please see the :bash:`Resources` section for details on this class.
Picks defaults for predictor, transformer configs from the existing inference deployment.
If version is not specified then deployment is updated to latest version.
wait_for_completion
When set to :bash:`True` this method will wait for job to complete after submission.
Job is declared complete if it is one of the :bash:`complete/failed/error` state
"""
inference = super().get_run("inference", run.user, run.name)
inference = inference["job"]["parameters"]["inference"]
if run.predictor.image == None:
run.update_serving_image(
None,
inference["serving_image"]["image"]["path"],
inference["serving_image"]["image"]["username"],
inference["serving_image"]["image"]["password"],
)
if run.serving_def.model == None:
run.serving_def.model = inference["model"]
if run.serving_def.version == None:
run.serving_def.version = inference["version"]
if inference["transformer"] == True and run.serving_def.transformer == False:
run.update_transformer_image(
inference["transformer_image"]["image"]["path"],
inference["transformer_image"]["image"]["username"],
inference["transformer_image"]["image"]["password"],
)
run.set_transformer(True, script=inference["transformer_code"])
run.update_transformer_code(
inference["transformer_project"], inference["transformer_commit_id"]
)
elif inference["transformer"] == False and run.serving_def.transformer == True:
li = self.get_model_lineage(
run.serving_def.owner, run.serving_def.model, run.serving_def.version
)
if run.transformer.image == None:
ti = li["run"]["parameters"]["generated"]["training_image"]["image"]
run.update_transformer_image(ti["path"], ti["username"], ti["password"])
if run.serving_def.transformer_project == None:
code = li["run"]["parameters"]["training"]["datums"]["workspace"][
"data"
]
name = code["name"].split(":")[1]
run.update_transformer_code(name, code["version"])
if run.serving_def.min_replicas == 0:
run.serving_def.min_replicas = inference["minreplicas"]
if run.serving_def.max_concurrent_requests == 0:
run.serving_def.max_concurrent_requests = inference["maxconcurrentrequests"]
if super().is_model_catalog_enabled() == True:
run.serving_def.deploy = inference["deploy"]
else:
run.serving_def.deploy = None
super().update_inference(run)
while wait_for_completion:
status = super().get_run("inference", run.user, run.name, fields="status")
state, reason = status["state"], status["reason"]
if state.lower() in ["complete", "failed", "error", "running", "stopped"]:
print(
"run {} - completed with state {} and reason {}".format(
run.name, state, reason
)
)
break
else:
print(
"run {} - waiting for completion, current state {}".format(
run.name, state
)
)
time.sleep(self.wait_interval)
[docs] def create_test_inference(self, run: DkubeServing, wait_for_completion=True):
"""
Method to create a test inference on DKube.
Raises Exception in case of errors.
*Inputs*
run
Instance of :bash:`dkube.sdk.rsrcs.serving` class.
Please see the :bash:`Resources` section for details on this class.
If serving image is not updated in :bash:`run:DkubeServing` argument then,
- If training used supported standard framework, dkube will pick approp serving image
- If training used custom image, dkube will try to use the same image for serving
If transformer image is not updated in :bash:`run:DkubeServing` then,
- Dkube will use same image as training image
If transformer code is not updated in :bash:`run:DkubeServing` then,
- Dkube will use the code used for training
wait_for_completion
When set to :bash:`True` this method will wait for job to complete after submission.
Job is declared complete if it is one of the :bash:`complete/failed/error` state
"""
assert (
type(run) == DkubeServing
), "Invalid type for run, value must be instance of rsrcs:DkubeServing class"
# Fetch training run details and fill in information for serving
if (
run.predictor.image == None
or (run.serving_def.transformer == True and run.transformer.image == None)
or (
run.serving_def.transformer == True
and run.serving_def.transformer_project == None
)
):
if run.serving_def.version == None:
v = self.get_model_latest_version(
run.serving_def.owner, run.serving_def.model
)
run.serving_def.version = v["uuid"]
li = self.get_model_lineage(
run.serving_def.owner, run.serving_def.model, run.serving_def.version
)
if li == None or li["run"] == None:
li = None
if li == None and run.predictor.image == None:
raise Exception("Lineage is nil, predictor image must be provided.")
if li != None and run.predictor.image == None:
si = li["run"]["parameters"]["generated"]["serving_image"]["image"]
run.update_serving_image(
None, si["path"], si["username"], si["password"]
)
if (
li != None
and run.serving_def.transformer == True
and run.transformer.image == None
):
ti = li["run"]["parameters"]["generated"]["training_image"]["image"]
run.update_transformer_image(ti["path"], ti["username"], ti["password"])
if (
li != None
and run.serving_def.transformer == True
and run.serving_def.transformer_project == None
):
code = li["run"]["parameters"]["training"]["datums"]["workspace"][
"data"
]
name = code["name"].split(":")[1]
run.update_transformer_code(name, code["version"])
# Don't allow prod deploy using this API, if MODEL_CATALOG_ENABLED=true
if (
run.serving_def.deploy == True
and super().is_model_catalog_enabled() == True
):
run.serving_def.deploy = None
super().create_run(run)
while wait_for_completion:
status = super().get_run("inference", run.user, run.name, fields="status")
state, reason = status["state"], status["reason"]
if state.lower() in ["complete", "failed", "error", "running", "stopped"]:
print(
"run {} - completed with state {} and reason {}".format(
run.name, state, reason
)
)
break
else:
print(
"run {} - waiting for completion, current state {}".format(
run.name, state
)
)
time.sleep(self.wait_interval)
[docs] def get_test_inference(self, user, name):
"""
Method to fetch the test inference with given name for the given user.
Raises exception in case of run is not found or any other connection errors.
*Inputs*
user
User whose test inference has to be fetched.
In case of if token is of different user, then the token should have permission to fetch the
serving run of the :bash:`user` in the input. They should be in same DKube group.
name
Name of the serving run to be fetched
"""
return super().get_run("inference", user, name)
[docs] def list_test_inferences(self, user, shared=False, filters="*"):
"""
Method to list all the training inferences of a user.
Raises exception on any connection errors.
*Inputs*
user
User whose test inferences must be fetched.
In case of if token is of different user, then the token should have permission to fetch the
serving runs of the :bash:`user` in the input. They should be in same DKube group.
filters
Only :bash:`*` is supported now.
User will able to filter runs based on state or duration
"""
return super().list_runs("inference", user, shared)
[docs] def delete_test_inference(self, user, name, wait_for_completion=True):
"""
Method to delete a test inference.
Raises exception if token is of different user or if serving run with name doesnt exist or on any connection errors.
*Inputs*
user
The token must belong to this user. As run of different user cannot be deleted.
name
Name of the run which needs to be deleted.
wait_for_completion
When set to :bash:`True` this method will wait for inference to get deleted.
"""
data = super().get_run("inference", user, name, fields="*")
uuid = data["job"]["parameters"]["generated"]["uuid"]
ret = super().delete_run("inference", user, name)
if wait_for_completion:
self._wait_for_rundelete_completion(uuid, "inference", name)
return ret
[docs] def create_code(self, code: DkubeCode, wait_for_completion=True):
"""
Method to create a code repo on DKube.
Raises Exception in case of errors.
*Inputs*
code
Instance of :bash:`dkube.sdk.rsrcs.code` class.
Please see the :bash:`Resources` section for details on this class.
wait_for_completion
When set to :bash:`True` this method will wait for code resource to get into one of the complete state.
code is declared complete if it is one of the :bash:`complete/failed/error` state
"""
assert (
type(code) == DkubeCode
), "Invalid type for run, value must be instance of rsrcs:DkubeCode class"
super().create_repo(code)
while wait_for_completion:
status = super().get_repo("program", code.user, code.name, fields="status")
state, reason = status["state"], status["reason"]
if state.lower() in ["ready", "failed", "error"]:
print(
"code {} - completed with state {} and reason {}".format(
code.name, state, reason
)
)
break
else:
print(
"code {} - waiting for completion, current state {}".format(
code.name, state
)
)
time.sleep(self.wait_interval)
[docs] def get_code(self, user, name):
"""
Method to fetch the code repo with given name for the given user.
Raises exception in case of code is not found or any other connection errors.
*Inputs*
user
User whose code has to be fetched.
In case of if token is of different user, then the token should have permission to fetch the
code of the :bash:`user` in the input. They should be in same DKube group.
name
Name of the code repo to be fetched
"""
return super().get_repo("program", user, name)
[docs] def list_code(self, user, shared=False, filters="*"):
"""
Method to list all the code repos of a user.
Raises exception on any connection errors.
*Inputs*
user
User whose projects must be fetched.
In case of if token is of different user, then the token should have permission to fetch the
projects of the :bash:`user` in the input. They should be in same DKube group.
filters
Only :bash:`*` is supported now.
User will able to filter projects based on state or the source
"""
return super().list_repos("program", user, shared)
[docs] def delete_code(self, user, name, force=False):
"""
Method to delete a code repo.
Raises exception if token is of different user or if code with name doesnt exist or on any connection errors.
*Inputs*
user
The token must belong to this user. As code of different user cannot be deleted.
name
Name of the code which needs to be deleted.
"""
super().delete_repo("program", user, name, force=force)
################### Feature Store ############################
[docs] def create_featureset(self, featureset: DkubeFeatureSet, wait_for_completion=True):
"""
Method to create a featureset on DKube.
*Available in DKube Release: 2.2*
*Inputs*
featureset
Instance of :bash:`dkube.sdk.rsrcs.featureSet` class.
Please see the :bash:`Resources` section for details on this class.
wait_for_completion
When set to :bash:`True` this method will wait for featureset resource to be ready or created
with v1 version in :bash:`sync` state
*Outputs*
A dictionary object with response status
"""
assert (
type(featureset) == DkubeFeatureSet
), "Invalid type for run, value must be instance of rsrcs:DkubeFeatureset class"
response = super().create_featureset(featureset)
if response["code"] == 200 and featureset.featurespec_path is not None:
spec_response = super().featureset_upload_featurespec(
featureset.featureset.name, featureset.featurespec_path
)
if spec_response["code"] != 200:
self.delete_featureset(featureset.featureset.name)
return spec_response
while wait_for_completion:
versions = super().get_featureset_versions(featureset.featureset.name)
if versions is None:
print("create_featureset: waiting for featureset to be setup")
time.sleep(self.wait_interval)
continue
version_status = DKubeFeatureSetUtils().get_version_status(versions, "v1")
if version_status.lower() == "synced":
break
print("create_featureset: waiting for featureset to be setup")
time.sleep(self.wait_interval)
return response
[docs] def delete_featuresets(self, featureset_list):
"""
Method to delete a list of featuresets on DKube.
Raises Exception in case of errors.
*Available in DKube Release: 2.2*
*Inputs*
featureset_list
list of featureset names
example: ["mnist-fs", "titanic-fs"]
*Outputs*
A dictionary object with response status with the list of deleted featureset names
"""
assert (
featureset_list
and isinstance(featureset_list, list)
and all(isinstance(featureset, str) for featureset in featureset_list)
), "Invalid parameter, value must be a list of featureset names"
return super().delete_featureset(featureset_list)
[docs] def delete_featureset(self, name):
"""
Method to delete a a featureset.
*Available in DKube Release: 2.2*
*Inputs*
name
featureset name to be deleted.
example: "mnist-fs"
*Outputs*
A dictionary object with response status and the deleted featureset name
"""
assert name and isinstance(
name, str
), "Invalid parameter, value must be a featureset name"
return super().delete_featureset([name])
[docs] def commit_featureset(self, **kwargs):
"""
Method to commit sticky featuresets.
featureset should be in ready state. It will be in created state if no featurespec is uploaded.
If the featureset is in created state, the following will happen.
a) If metadata is passed, it will be uploaded as featurespec
b) If no metadata is passed, it derives from df and uploads it.
If the featureset is in ready state, the following will happen.
a) metadata if passed any will be ignored
b) featurespec will be downloaded for the specifed featureset and df is validated for conformance.
If name is specified, it derives the path for committing the features.
If path is also specified, it doesn't derive the path. It uses the specified path. However, path should a mount path into dkube store.
If df is not specified, it assumes the df is already written to the featureset path. Features can be written to featureset mount path using DkubeFeatureSet.write_features
*Available in DKube Release: 2.2*
*Inputs*
name
featureset name or None
example: name='fset'
df
Dataframe with features to be written
None or empty df are invalid
type: pandas.DataFrame
metadata
optional yaml object with name, description and schema fields or None
example:metadata=[{'name':age, 'description:'', 'schema':int64}]
path
Mount path where featureset is mounted or None
example: path='/opt/dkube/fset'
*Outputs*
Dictionary with response status
"""
name = kwargs.get("name", None)
df = kwargs.get("df", None)
metadata = kwargs.get("metadata", None)
path = kwargs.get("path", None)
merge = kwargs.get("merge", "True")
dftype = kwargs.get("dftype", "Py")
if not df is None:
assert not df.empty, "df should not be empty"
else:
# Todo: Handle commit for featuresets mounted as k8s volumes
assert name or path, "name or path should be specified"
featurespec = None
existing_spec = []
if name is not None:
featurespec, valid = super().get_featurespec(name)
assert valid, "featureset not found"
if featurespec:
existing_spec = featurespec
if merge is False:
existing_spec = []
if (dftype == "Py") and (
(len(existing_spec) != len(df.keys()))
and (name is not None)
and (df is not None)
):
if not metadata:
metadata = DKubeFeatureSetUtils().compute_features_metadata(
df, existing_spec
)
assert metadata, "The specified featureset is invalid"
self.upload_featurespec(featureset=name, filepath=None, metadata=metadata)
featurespec = metadata
if (dftype == "Py") and (featurespec is not None) and (df is not None):
isdf_valid = DKubeFeatureSetUtils().validate_features(df, featurespec)
assert isdf_valid, "DataFrame validation failed"
return super().commit_featureset(name, df, path, dftype)
[docs] def read_featureset(self, **kwargs):
"""
Method to read a featureset version.
If name is specified, path is derived. If featureset is not mounted, a copy is made to user's homedir
If path is specified, it should be a mounted path
*Available in DKube Release: 2.2*
*Inputs*
name
featureset to be read
example: name='fset' or None
version
version to be read.
If no version specified, latest version is assumed
example: version='v2' or None
path
path where featureset is mounted.
path='/opt/dkube/fset' or None
*Outputs*
Dataframe object
"""
name = kwargs.get("name", None)
version = kwargs.get("version", None)
path = kwargs.get("path", None)
dftype = kwargs.get("dftype", "Py")
assert (version == None) or isinstance(version, str), "version must be a string"
return super().read_featureset(name, version, path, dftype)
[docs] def list_featuresets(self, query=None):
"""
Method to list featuresets based on query string.
Raises Exception in case of errors.
*Available in DKube Release: 2.2*
*Inputs*
query
A query string that is compatible with Bleve search format
*Outputs*
A dictionary object with response status and the list of featuresets
"""
return super().list_featureset(query)
[docs] def upload_featurespec(self, featureset=None, filepath=None, metadata=None):
"""
Method to upload feature specification file.
*Available in DKube Release: 2.2*
*Inputs*
featureset
The name of featureset
filepath
Filepath for the feature specification metadata yaml file
metadata
feature specification in yaml object.
One of filepath or metadata should be specified.
*Outputs*
A dictionary object with response status
"""
assert featureset and isinstance(featureset, str), "featureset must be string"
assert bool(filepath) ^ bool(
metadata
), "One of filepath and metadata should be specified"
return super().featureset_upload_featurespec(featureset, filepath, metadata)
[docs] def get_featureset(self, featureset=None):
"""
Method to retrieve details of a featureset
*Available in DKube Release: 2.2*
*Inputs*
featureset
The name of featureset
*Outputs*
A dictionary object with response status, featureset metadata and feature versions
"""
return super().get_featureset(featureset)
[docs] def get_featurespec(self, featureset=None):
"""
Method to retrieve feature specification method.
*Available in DKube Release: 2.2*
*Inputs*
featureset
The name of featureset
*Outputs*
A dictionary object with response status and feature specification metadata
"""
return super().get_featurespec(featureset)
###############################################################
[docs] def create_dataset(self, dataset: DkubeDataset, wait_for_completion=True):
"""
Method to create a dataset on DKube.
Raises Exception in case of errors.
*Inputs*
dataset
Instance of :bash:`dkube.sdk.rsrcs.dataset` class.
Please see the :bash:`Resources` section for details on this class.
wait_for_completion
When set to :bash:`True` this method will wait for dataset resource to get into one of the complete state.
dataset is declared complete if it is one of the :bash:`complete/failed/error` state
"""
assert (
type(dataset) == DkubeDataset
), "Invalid type for run, value must be instance of rsrcs:DkubeDataset class"
super().create_repo(dataset)
if dataset.datum.remote == True:
wait_for_completion = False
while wait_for_completion:
status = super().get_repo(
"dataset", dataset.user, dataset.name, fields="status"
)
state, reason = status["state"], status["reason"]
if state.lower() in ["ready", "failed", "error"]:
print(
"dataset {} - completed with state {} and reason {}".format(
dataset.name, state, reason
)
)
break
else:
print(
"dataset {} - waiting for completion, current state {}".format(
dataset.name, state
)
)
time.sleep(self.wait_interval)
[docs] def get_dataset(self, user, name):
"""
Method to fetch the dataset with given name for the given user.
Raises exception in case of dataset is not found or any other connection errors.
*Inputs*
user
User whose dataset has to be fetched.
In case of if token is of different user, then the token should have permission to fetch the
dataset of the :bash:`user` in the input. They should be in same DKube group.
name
Name of the dataset to be fetched
"""
return super().get_repo("dataset", user, name)
[docs] def list_datasets(self, user, shared=False, filters="*"):
"""
Method to list all the datasets of a user.
Raises exception on any connection errors.
*Inputs*
user
User whose datasets must be fetched.
In case of if token is of different user, then the token should have permission to fetch the
datasets of the :bash:`user` in the input. They should be in same DKube group.
filters
Only :bash:`*` is supported now.
User will able to filter datasets based on state or the source
"""
return super().list_repos("dataset", user, shared)
[docs] def delete_dataset(self, user, name, force=False):
"""
Method to delete a dataset.
Raises exception if token is of different user or if dataset with name doesnt exist or on any connection errors.
*Inputs*
user
The token must belong to this user. As dataset of different user cannot be deleted.
name
Name of the dataset which needs to be deleted.
"""
super().delete_repo("dataset", user, name, force=force)
[docs] def create_model(self, model: DkubeModel, wait_for_completion=True):
"""
Method to create a model on DKube.
Raises Exception in case of errors.
*Inputs*
model
Instance of :bash:`dkube.sdk.rsrcs.model` class.
Please see the :bash:`Resources` section for details on this class.
wait_for_completion
When set to :bash:`True` this method will wait for model resource to get into one of the complete state.
model is declared complete if it is one of the :bash:`complete/failed/error` state
"""
assert (
type(model) == DkubeModel
), "Invalid type for run, value must be instance of rsrcs:DkubeModel class"
super().create_repo(model)
while wait_for_completion:
status = super().get_repo("model", model.user, model.name, fields="status")
state, reason = status["state"], status["reason"]
if state.lower() in ["ready", "failed", "error"]:
print(
"model {} - completed with state {} and reason {}".format(
model.name, state, reason
)
)
break
else:
print(
"model {} - waiting for completion, current state {}".format(
model.name, state
)
)
time.sleep(self.wait_interval)
[docs] def get_model(self, user, name, publish_details=False):
"""
Method to fetch the model with given name for the given user.
Raises exception in case of model is not found or any other connection errors.
*Inputs*
user
User whose model has to be fetched.
In case of if token is of different user, then the token should have permission to fetch the
model of the :bash:`user` in the input. They should be in same DKube group.
name
Name of the model to be fetched
"""
dkubever = self.dkubeinfo["version"]
if (
pversion.parse(dkubever) < pversion.parse("2.3.0.0")
) or publish_details == False:
return super().get_repo("model", user, name)
else:
modelObj = super().get_repo("model", user, name)
versions = modelObj["versions"]
for v in versions:
if v["version"]["model"]["stage"] == "PUBLISHED":
publish = super().get_model_catalog(user, name)
modelObj["publish_details"] = publish
return modelObj
return modelObj
[docs] def list_models(self, user, shared=False, published=False, filters="*"):
"""
Method to list all the models of a user.
Raises exception on any connection errors.
*Inputs*
user
User whose models must be fetched.
In case of if token is of different user, then the token should have permission to fetch the
models of the :bash:`user` in the input. They should be in same DKube group.
filters
Only :bash:`*` is supported now.
User will able to filter models based on state or the source
published
If Published is true, it will return all published models
"""
if published == True:
return super().list_published_models(user)
return super().list_repos("model", user, shared)
[docs] def delete_model(self, user, name, force=False):
"""
Method to delete a model.
Raises exception if token is of different user or if model with name doesnt exist or on any connection errors.
*Inputs*
user
The token must belong to this user. As model of different user cannot be deleted.
name
Name of the model which needs to be deleted.
"""
super().delete_repo("model", user, name, force=force)
[docs] def trigger_runs_bycode(self, code, user):
"""
Method to trigger all the runs in dkube which uses the mentioned code.
*Inputs*
code
Name of the code.
user
Owner of the code. All runs of this user will be retriggered.
"""
condition = TriggerCondition(match="code", name=code, user=user)
return super().trigger_runs(condition)
[docs] def trigger_runs_bydataset(self, dataset, user):
"""
Method to trigger all the runs in dkube which uses the mentioned dataset in input.
*Inputs*
dataset
Name of the dataset.
user
Owner of the dataset. All runs of this user will be retriggered.
"""
condition = TriggerCondition(match="dataset", name=dataset, user=user)
return super().trigger_runs(condition)
[docs] def trigger_runs_bymodel(self, model, user):
"""
Method to trigger all the runs in dkube which uses the mentioned model in input.
*Inputs*
model
Name of the model.
user
Owner of the model. All runs of this user will be retriggered.
"""
condition = TriggerCondition(match="model", name=model, user=user)
return super().trigger_runs(condition)
[docs] def get_model_lineage(self, user, name, version):
"""
Method to get lineage of a model version.
*Inputs*
name
Name of the model
version
Version of the model
user
Owner of the model.
"""
return super().get_datum_lineage("model", user, name, version)
[docs] def get_dataset_lineage(self, user, name, version):
"""
Method to get lineage of a dataset version.
*Inputs*
name
Name of the dataset
version
Version of the dataset
user
Owner of the dataset.
"""
return super().get_datum_lineage("dataset", user, name, version)
[docs] def get_training_run_lineage(self, user, name):
"""
Method to get lineage of a training run.
*Inputs*
name
Name of the run
user
owner of the run
"""
# Get the training run info
run = self.get_training_run(user, name)
runid = run["job"]["parameters"]["generated"]["uuid"]
return super().get_run_lineage("training", user, runid)
[docs] def get_preprocessing_run_lineage(self, user, name):
"""
Method to get lineage of a preprocessing run.
*Inputs*
name
Name of the run
user
owner of the run
"""
# Get the preprocessing run info
run = get_preprocessing_run(user, name)
runid = run["job"]["parameters"]["generated"]["uuid"]
return super().get_run_lineage("preprocessing", user, runid)
[docs] def get_model_versions(self, user, name):
"""
Method to get the versions of model.
Versions are returned in ascending order.
*Inputs*
name
Name of the model
user
owner of the model
"""
model = self.get_model(user, name)
return model["versions"]
[docs] def get_model_latest_version(self, user, name):
"""
Method to get the latest version of the given model.
*Inputs*
name
Name of the model
user
owner of the model
"""
versions = self.get_model_versions(user, name)
return versions[0]["version"]
[docs] def get_model_version(self, user, name, version):
"""
Method to get details of a version of the given model.
Raises `NotFoundException` if the version is not found
*Inputs*
name
Name of the model
version
Version of the model
user
owner of the model
"""
versions = self.get_model_versions(user, name)
for v in versions:
if v["version"]["uuid"] == version:
return v["version"]
raise Exception("{}/{}/{} not found".format(user, name, version))
[docs] def get_dataset_versions(self, user, name):
"""
Method to get the versions of dataset.
Versions are returned in ascending order.
*Inputs*
name
Name of the dataset
user
owner of the dataset
"""
dataset = self.get_dataset(user, name)
return dataset["versions"]
[docs] def get_dataset_latest_version(self, user, name):
"""
Method to get the latest version of the given dataset.
*Inputs*
name
Name of the dataset
user
owner of the dataset
"""
versions = self.get_dataset_versions(user, name)
return versions[0]["version"]
[docs] def get_dataset_version(self, user, name, version):
"""
Method to get details of a version of the given dataset.
Raises `NotFoundException` if the version is not found
*Inputs*
name
Name of the dataset
version
Version of the dataset
user
owner of the dataset
"""
versions = self.get_dataset_versions(user, name)
for v in versions:
if v["version"]["uuid"] == version:
return v["version"]
raise Exception("{}/{}/{} not found".format(user, name, version))
[docs] def get_datascience_capabilities(self):
"""
Method to get the datascience capabilities of the platform.
Returns the supported frameworks, versions and the corresponding container image details.
"""
return super().get_datascience_capability()
[docs] def get_notebook_capabilities(self):
"""
Method to get the notebook capabilities of the platform.
Returns the supported frameworks, versions and the image details.
"""
caps = self.get_datascience_capabilities()
return caps["nb_ide"]["frameworks"]
[docs] def get_r_capabilities(self):
"""
Method to get the R language capabilities of the platform.
Returns the supported frameworks, versions and the image details.
"""
caps = self.get_datascience_capabilities()
return caps["r_ide"]["frameworks"]
[docs] def get_training_capabilities(self):
"""
Method to get the training capabilities of the platform.
Returns the supported frameworks, versions and the image details.
"""
caps = self.get_datascience_capabilities()
return caps["training"]["frameworks"]
[docs] def get_serving_capabilities(self):
"""
Method to get the serving capabilities of the platform.
Returns the supported frameworks, versions and the image details.
"""
caps = self.get_datascience_capabilities()
return caps["serving"]["frameworks"]
def list_frameworks(self):
fw_opts = ["custom"]
fws = self.get_training_capabilities()
for fw in fws:
for v in fw["versions"]:
name = fw["name"] + "_" + v["name"]
fw_opts.append(name)
return json.dumps(fw_opts)
[docs] def release_model(self, user, model, version=None, wait_for_completion=True):
"""
Method to release a model to model catalog.
Raises Exception in case of errors.
*Available in DKube Release: 2.2*
*Inputs*
model
Name with model.
version
Version of the model to be released.
If not passed then latest version is released automatically.
user
Owner of the model.
wait_for_completion
When set to :bash:`True` this method will wait for publish to finish.
Publishing is complete if stage of the mode is changed to :bash:`published/failed/error`
"""
if version == None:
version = self.get_model_latest_version(user, model)
version = version["uuid"]
super().release_model(user, model, version)
while wait_for_completion:
v = self.get_model_version(user, model, version)
stage = v["model"]["stage"]
reason = v["model"]["reason"]
if stage.lower() in ["released", "failed", "error"]:
print(
"release {}/{} - completed with state {} and reason {}".format(
model, version, stage, reason
)
)
break
else:
print(
"release {}/{} - waiting for completion, current state {}".format(
model, version, stage
)
)
time.sleep(self.wait_interval)
[docs] def publish_model(
self, name, description, details: DkubeServing, wait_for_completion=True
):
"""
Method to publish a model to model catalog.
Raises Exception in case of errors.
*Available in DKube Release: 2.2*
*Inputs*
name
Name with which the model must be published in the model catalog.
description
Human readable text for the model being published
details
Instance of :bash:`dkube.sdk.rsrcs.serving` class.
Please see the :bash:`Resources` section for details on this class.
If serving image is not updated in :bash:`run:DkubeServing` argument then,
- If training used supported standard framework, dkube will pick approp serving image
- If training used custom image, dkube will try to use the same image for serving
If transformer image is not updated in :bash:`run:DkubeServing` then,
- Dkube will use same image as training image
If transformer code is not updated in :bash:`run:DkubeServing` then,
- Dkube will use the code used for training
wait_for_completion
When set to :bash:`True` this method will wait for publish to finish.
Publishing is complete if stage of the mode is changed to :bash:`published/failed/error`
"""
run = details
user, model, version = (
run.serving_def.owner,
run.serving_def.model,
run.serving_def.version,
)
# Fetch training run details and fill in information for serving
if (
run.predictor.image == None
or (run.serving_def.transformer == True and run.transformer.image == None)
or (
run.serving_def.transformer == True
and run.serving_def.transformer_project == None
)
):
if run.serving_def.version == None:
v = self.get_model_latest_version(
run.serving_def.owner, run.serving_def.model
)
run.serving_def.version = v["uuid"]
li = self.get_model_lineage(
run.serving_def.owner, run.serving_def.model, run.serving_def.version
)
if run.predictor.image == None:
si = li["run"]["parameters"]["generated"]["serving_image"]["image"]
run.update_serving_image(
None, si["path"], si["username"], si["password"]
)
if run.serving_def.transformer == True and run.transformer.image == None:
ti = li["run"]["parameters"]["generated"]["training_image"]["image"]
run.update_transformer_image(ti["path"], ti["username"], ti["password"])
if (
run.serving_def.transformer == True
and run.serving_def.transformer_project == None
):
code = li["run"]["parameters"]["training"]["datums"]["workspace"][
"data"
]
cname = code["name"].split(":")[1]
run.update_transformer_code(cname, code["version"])
data = {"name": name, "description": description, "serving": run.serving_def}
super().publish_model(user, model, version, data)
while wait_for_completion:
v = self.get_model_version(user, model, version)
stage = v["model"]["stage"]
reason = v["model"]["reason"]
if stage.lower() in ["published", "failed", "error"]:
print(
"publish {}/{} - completed with state {} and reason {}".format(
model, version, stage, reason
)
)
break
else:
print(
"publish {}/{} - waiting for completion, current state {}".format(
model, version, stage
)
)
time.sleep(self.wait_interval)
[docs] def create_model_deployment(
self,
user,
name,
model,
version,
description=None,
stage_or_deploy="stage",
min_replicas=0,
max_concurrent_requests=0,
wait_for_completion=True,
):
"""
Method to create a serving deployment for a model in the model catalog.
Raises Exception in case of errors.
*Inputs*
user
Name of the user creating the deployment
name
Name of the deployment. Must be unique
description
User readable description of the deployment
model
Name of the model to be deployed
version
Version of the model to be deployed
stage_or_deploy
Default set to :bash: `stage` which means to stage the model deployment for testing before
deploying it for production.
Change to :bash: `deploy` to deploy the model in production
min_replicas
Minimum number of replicas that each Revision should have.
If not prvided, uses value set in platform config map.
max_concurrent_requests
Soft limit that specifies the maximum number of requests an inf pod can process at a time.
If not prvided, uses value set in platform config map.
wait_for_completion
When set to :bash:`True` this method will wait for job to complete after submission.
Job is declared complete if it is one of the :bash:`complete/failed/error` state
"""
assert stage_or_deploy in [
"stage",
"deploy",
], "Invalid value for stage_or_deploy parameter."
run = DkubeServing(user, name=name, description=description)
run.update_serving_model(model, version=version)
run.update_autoscaling_config(min_replicas, max_concurrent_requests)
dkubever = self.dkubeinfo["version"]
if pversion.parse(dkubever) < pversion.parse("2.3.0.0"):
mcitem = self.get_modelcatalog_item(
user, modelcatalog=model, version=version
)
run.update_serving_image(
image_url=mcitem["serving"]["images"]["serving"]["image"]["path"]
)
else:
mcitem = super().get_model_catalog(user, model)
versions = mcitem["versions"]
for v in versions:
if v["version"] == version:
serving_image = v["serving"]["images"]["serving"]["image"]["path"]
run.update_serving_image(image_url=serving_image)
if stage_or_deploy == "stage":
super().stage_model(run)
if stage_or_deploy == "deploy":
super().deploy_model(run)
while wait_for_completion:
status = super().get_run("inference", run.user, run.name, fields="status")
state, reason = status["state"], status["reason"]
if state.lower() in ["complete", "failed", "error", "running"]:
print(
"run {} - completed with state {} and reason {}".format(
run.name, state, reason
)
)
break
else:
print(
"run {} - waiting for completion, current state {}".format(
run.name, state
)
)
time.sleep(self.wait_interval)
[docs] def delete_model_deployment(self, user, name, wait_for_completion=True):
"""
Method to delete a model deployment.
Raises exception if token is of different user or if serving run with name doesnt exist or on any connection errors.
*Inputs*
user
The token must belong to this user. As run of different user cannot be deleted.
name
Name of the run which needs to be deleted.
wait_for_completion
When set to :bash:`True` this method will wait for deployment to get deleted.
"""
data = super().get_run("inference", user, name, fields="*")
uuid = data["job"]["parameters"]["generated"]["uuid"]
ret = super().delete_run("inference", user, name)
if wait_for_completion:
self._wait_for_rundelete_completion(uuid, "inference", name)
return ret
[docs] def list_model_deployments(self, user, shared=False, filters="*"):
"""
Method to list all the model deployments.
Raises exception on any connection errors.
*Inputs*
user
Name of the user.
filters
Only :bash:`*` is supported now.
User will able to filter runs based on state or duration
"""
deps = []
resp = super().list_runs("inference", user, shared)
for item in resp:
for inf in item["jobs"]:
deploy = inf["parameters"]["inference"]["deploy"]
# MAK - BUG - there is no way today from backend response to separate the test-inferences
# vs serving deployments. So appending all.
deps.append(inf)
return deps
[docs] def modelcatalog(self, user):
"""
Method to fetch the model catalog from DKube.
Model catalog is list of models published by datascientists and are
ready for staging or deployment on a production cluster.
The user must have permission to fetch the model catalog.
*Available in DKube Release: 2.2*
*Inputs*
user
Name of the user.
"""
return super().modelcatalog(user)
[docs] def get_modelcatalog_item(self, user, modelcatalog=None, model=None, version=None):
"""
Method to get an item from modelcatalog
Raises exception on any connection errors.
*Available in DKube Release: 2.2*
*Inputs*
user
Name of the user.
modelcatalog
Model catalog name
model
Name of the model catalog
version
Version of the model
"""
if modelcatalog is None and model is None:
return "either model catalog name or model name should be provided"
if version is None:
return "Model Version must be provided"
if modelcatalog:
mc = self.modelcatalog(user)
for item in mc:
if item["name"] == modelcatalog:
for iversion in item["versions"]:
if iversion["model"]["version"] == version:
return iversion
raise Exception("{}.{} not found in model catalog".format(model, version))
else:
mc = self.modelcatalog(user)
for item in mc:
if item["model"]["name"] == model:
for iversion in item["versions"]:
if iversion["model"]["version"] == version:
return iversion
raise Exception("{}.{} not found in model catalog".format(model, version))
[docs] def delete_modelcatalog_item(
self, user, modelcatalog=None, model=None, version=None
):
"""
Method to delete an item from modelcatalog
Raises exception on any connection errors.
*Available in DKube Release: 2.2*
*Inputs*
user
Name of the user.
modelcatalog
Model catalog name
model
Name of the model catalog
version
Version of the model
"""
if modelcatalog is None and model is None:
return "either model catalog name or model name should be provided"
if version is None:
return "Model Version must be provided"
if modelcatalog:
response = self._api.delete_model_catalog_item(user, modelcatalog, version)
return response
else:
mc = self.modelcatalog(user)
for item in mc:
if item["model"]["name"] == model:
modelcatalog = item["name"]
response = self._api.delete_model_catalog_item(
user, modelcatalog, version
)
return response
raise Exception("{}.{} not found in model catalog".format(model, version))
[docs] def list_projects(self):
"""
Return list of DKube projects.
*Available in DKube Release: 2.2*
"""
response = self._api.get_all_projects().to_dict()
assert response["response"]["code"] == 200, response["response"]["message"]
return response["data"]
[docs] def create_project(self, project: DkubeProject):
"""Creates DKube Project.
*Available in DKube Release: 2.2*
*Inputs*
project
instance of :bash:`dkube.sdk.rsrcs.DkubeProject` class.
"""
assert (
type(project) == DkubeProject
), "Invalid type for project, value must be instance of rsrcs:DkubeProject class"
response = self._api.create_project(project).to_dict()
assert response["response"]["code"] == 200, response["response"]["message"]
return response["data"]
[docs] def update_project(self, project_id, project: DkubeProject):
"""Update project details.
*Available in DKube Release: 2.2*
Note: details and evail_details fields are base64 encoded.
*Inputs*
project_id
id of the project
project
instance of :bash:`dkube.sdk.rsrcs.DkubeProject` class.
"""
assert (
type(project) == DkubeProject
), "Invalid type for project, value must be instance of rsrcs:DkubeProject class"
project.id = project_id
response = self._api.update_one_project(
project_id=project.id, data=project
).to_dict()
assert response["code"] == 200, response["message"]
[docs] def get_project_id(self, name):
""" "Get project id from project name.
*Available in DKube Release: 2.2*
*Inputs*
name
name of the project
"""
response = self._api.get_all_projects().to_dict()
assert response["response"]["code"] == 200, response["response"]["message"]
for project in response["data"]:
if project["name"] == name:
return project["id"]
return None
[docs] def get_project(self, project_id):
"""Get project details.
*Available in DKube Release: 2.2*
*Inputs*
project_id
id of the project
"""
response = self._api.get_one_project(project_id).to_dict()
assert response["response"]["code"] == 200, response["response"]["message"]
return response["data"]
[docs] def get_leaderboard(self, project_id):
"""Get project's leaderboard details.
*Available in DKube Release: 2.2*
*Inputs*
project_id
id of the project
"""
response = self._api.get_all_project_submissions(project_id).to_dict()
assert response["response"]["code"] == 200, response["response"]["message"]
return response["data"]
[docs] def delete_project(self, project_id):
"""Delete project. This only deletes the project and not the associated resources.
*Available in DKube Release: 2.2*
*Inputs*
project_id
id of the project
"""
project_ids = {"project_ids": [project_id]}
response = self._api.projects_delete_list(project_ids).to_dict()
assert response["code"] == 200, response["message"]
[docs] def upload_model(
self, user, name, filepath, extract=False, wait_for_completion=True
):
"""Upload model. This creates a model and uploads the file residing in your local workstation.
Supported formats are tar, gz, tar.gz, tgz, zip, csv and txt.
*Available in DKube Release: 2.2*
*Inputs*
user
name of user under which model is to be created in dkube.
name
name of model to be created in dkube.
filepath
path of the file to be uploaded
extract
if extract is set to True, the file will be extracted after upload.
wait_for_completion
When set to :bash:`True` this method will wait for model resource to get into one of the complete state.
model is declared complete if it is one of the :bash:`complete/failed/error` state
"""
upl_resp = super().upload_model(user, name, filepath, extract=extract)
while wait_for_completion:
status = super().get_repo("model", user, name, fields="status")
state, reason = status["state"], status["reason"]
if state.lower() in ["ready", "failed", "error"]:
print(
"model {} - completed with state {} and reason {}".format(
name, state, reason
)
)
break
else:
print(
"model {} - waiting for completion, current state {}".format(
name, state
)
)
time.sleep(self.wait_interval)
[docs] def download_dataset(self, path, user, name, version=None):
"""This method is to download a version of dataset.
Downloaded content will be copied in the specified path.
*Inputs*
path
Target path where the dataset must be downloaded.
user
name of user who owns the dataset.
name
name of dataset.
version
version of the dataset.
"""
if version == None:
version = self.get_dataset_latest_version(user, name)
version = version["uuid"]
super().download_dataset(path, user, name, version)
[docs] def download_model(self, path, user, name, version=None):
"""This method is to download a version of model.
Downloaded content will be copied in the specified path.
*Inputs*
path
Target path where the dataset must be downloaded.
user
name of user who owns the dataset.
name
name of dataset.
version
version of the dataset.
"""
if version == None:
version = self.get_model_latest_version(user, name)
version = version["uuid"]
super().download_model(path, user, name, version)
def _wait_for_rundelete_completion(self, uuid, _class, name):
dkubever = self.dkubeinfo["version"]
# MAK - ideally the target version should be 2.2.7.0 and it should
# be sufficient to check for older release(s), but there is an internal patch to enable
# automation suite which returns release version as 2.2.1.13
if pversion.parse(dkubever) < pversion.parse("2.2.1.13"):
# Older release - waiting for deletion to complete not supported
return
try: # MAK - try block can be removed, once 2.2.7.0 is released
while True:
data = super().get_run_byuuid(uuid)
state = data["parameters"]["generated"]["status"]["sub_state"]
if state == None: # MAK - check can be removed once 2.2.7.0 is released
# Older release - ignore the error
break
if state.lower() == "deleting":
print(
"{} {} - waiting for deletion, current state {}".format(
_class, name, state
)
)
time.sleep(self.wait_interval)
elif state.lower() == "deleted":
print("{} {} - deleted successfully".format(_class, name))
break
except ApiException as ae:
if ae.status == 404:
# Older release - ignore the error
print(
"ignoring 404 - fetching deleted jobs failed, older release of dkube"
)
else:
raise ae
[docs] def list_inference_endpoints(self):
"""
Method to list all the inferences in the dkube cluster.
Raises exception on any connection errors.
"""
return super().list_inference_endpoints()
[docs] def list_cicd_images(self, repo=None):
"""
Method to list all the CICD images + Any images manually added in DKube.
*Inputs*
repo
Git repo URL. If provided, only returns images generated for that repo
"""
response = self._api.get_all_cicd_images().to_dict()
assert response["response"]["code"] == 200, response["response"]["message"]
if repo is None:
return response["data"]
images = []
for entry in response["data"]:
if "repo" in entry["image"] and entry["image"]["repo"] == repo:
images.append(entry)
return images
### Model monitor apis ##########
[docs] def modelmonitor_create(
self, modelmonitor: DkubeModelmonitor, wait_for_completion=True
):
"""
Method to create Model Monitor on Dkube
*Available in DKube Release: 3.0*
*Inputs*
modelmonitor
Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitor class.
Please see the :bash:`Resources` section for details on this class.
wait_for_completion
When set to :bash:`True` this method will wait for modelmonitor resource to get into one of the complete state.
modelmonitor is declared complete if it is one of the :bash:`init/ready/error` state
*Outputs*
a dictionary object with response status
"""
assert (
type(modelmonitor) == DkubeModelmonitor
), "Invalid type for model monitor, value must be instance of rsrcs:DkubeModelmonitor class"
response = super().create_model_monitor(modelmonitor)
while wait_for_completion:
mm_config = super().get_modelmonitor_configuration(response["uuid"])
state = mm_config["status"]["state"]
if state.lower() in ["init", "ready", "error"]:
print(
"ModelMonitor {} - completed with state {} and reason {}".format(
modelmonitor.name, state, response["message"]
)
)
break
else:
print(
"ModelMonitor {} - waiting for completion, current state {}".format(
modelmonitor.name, state
)
)
time.sleep(self.wait_interval)
return response
[docs] def modelmonitor_list(self, **kwargs):
"""
Method to list the modelmonitors.
*Available in DKube Release: 3.0*
*Inputs*
tags
string
page
integer
archived
boolean, when archived=True, list the archived modelmonitors
*Outputs*
A list containing the modelmonitors
"""
tags = kwargs.get("tags")
page = kwargs.get("page")
archived = kwargs.get("archived", False)
query_params = {}
if tags:
query_params["tags"] = tags
if page:
query_params["page"] = page
if archived:
query_params["archived"] = archived
return super().list_modelmonitor(query_params)
[docs] def modelmonitor_get_id(self, name=None):
"""
Method to get the id of a model monitor.
*Available in DKube Release: 3.0*
*Inputs*
name
Name of the modelmonitor
*Outputs*
An uuid of the modelmonitor
"""
response = super().get_modelmonitor_id(name).to_dict()["data"]
if response != None:
return response.get(name)
else:
return None
[docs] def modelmonitor_get_alertid(self, id=None, alert_name=None):
"""
Method to get the alert id of a modelmonitor.
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
alert_name
Name of the alert
*Outputs*
an id of the alert
"""
response = super().get_modelmonitor_alerts(id)
for alert in response:
if alert["name"] == alert_name:
return alert["id"]
return None
[docs] def modelmonitor_get(self, id=None):
"""
Method to get the modelmonitor.
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
*Outputs*
A dictionary containing the configuration of the modelmonitor
"""
return super().get_modelmonitor_configuration(id)
[docs] def modelmonitor_get_datasets(self, id=None, data_class: DatasetClass = None):
"""
Method to get the datasets of the modelmonitor.
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
data_class
data class of the dataset in the modelmonitor must be one of ["TrainData","PredictData","LabelledData"]
by default set to None
*Outputs*
if data_class is None:
A list of dictionaries containing all the datasets information.
if data_class is 'PredictData' or 'LabelledData' or 'TrainData':
An individual data dictionary for that data class.
"""
datasets = super().get_modelmonitor_dataset(id)
if data_class == None:
return datasets
else:
for data in datasets:
if data["_class"] == data_class:
return data
[docs] def modelmonitor_get_alerts(self, id=None):
"""
Method to get the alerts of the modelmonitor.
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
*Outputs*
a list of dictionaries containing individual alerts information
"""
return super().get_modelmonitor_alerts(id)
[docs] def modelmonitors_delete(self, ids=[]):
"""
Method to delete the multiple modelmonitors.
*Available in DKube Release: 3.0*
*Inputs*
ids
List of modelmonitor Ids to be deleted. Example: ["cd123","345fg"]
*Outputs*
A dictionary object with response status
"""
return super().delete_modelmonitors(ids)
[docs] def modelmonitor_delete(self, id=None):
"""
Method to delete the single modelmonitor.
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
*Outputs*
A dictionary object with response status
"""
return super().delete_modelmonitors([id])
[docs] def modelmonitor_get_metricstemplate(self):
"""
Method to get the metrics supported for the modelmonitor.
*Available in DKube Release: 3.0*
Outputs*
a list of dictionaries containing metrics template for Regression and Classification
"""
return super().get_modelmonitor_template()
[docs] def modelmonitor_delete_alert(self, id=None, alert_id=None):
"""
Method to delete the alerts in the modelmonitor
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
alert_id
Id of a modelmonitor alert
"""
delete_alertsid_list = []
delete_alertsid_list.append(alert_id)
return super().delete_modelmonitor_alert(id, delete_alertsid_list)
[docs] def modelmonitor_add_alert(
self, id=None, alert_data: DkubeModelmonitoralert = None
):
"""
Method to add the alerts in the modelmonitor
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
alert_data
Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitoralert` class.
Please see the :bash:`Resources` section for details on this class.
Outputs*
a dictionary object with response status
"""
if self.modelmonitor_get_datasets(
id=id, data_class="TrainData"
) and self.modelmonitor_get_datasets(id=id, data_class="PredictData"):
alert_dict = json.loads(alert_data.to_JSON())
alert_dict["class"] = alert_dict.pop("_class")
response = super().modelmonitor_addalert(id, {"data": [alert_dict]})
return response
else:
print("Add train and predict data before adding alerts")
return None
[docs] def modelmonitor_add_dataset(
self, id=None, data: DkubeModelmonitordataset = None, wait_for_completion=True
):
"""
Method to add the dataset in the modelmonitor
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
data
Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitordataset` class.
Please see the :bash:`Resources` section for details on this class.
wait_for_completion
When set to :bash:`True` this method will wait for modelmonitor resource to get into one of the complete state and then add the datasets.
modelmonitor is declared complete if it is one of the :bash:`init/ready/error` state
Outputs*
a dictionary object with response status
"""
data_dict = json.loads(data.to_JSON())
data_dict["class"] = data_dict.pop("_class")
response = super().modelmonitor_adddataset(id, {"data": data_dict})
while wait_for_completion:
mm_config = super().get_modelmonitor_configuration(id)
state = mm_config["status"]["state"]
if state.lower() in ["init", "ready", "error"]:
break
else:
print(
"Modelmonitor add dataset not completed yet, current state {}".format(
state
)
)
time.sleep(self.wait_interval)
return response["response"]
[docs] def modelmonitor_archive(self, id=None):
"""
Method to archive the modelmonitor
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
Outputs*
a dictionary object with response status
"""
return super().modelmonitor_archive(id, archive=True)
[docs] def modelmonitor_unarchive(self, id=None):
"""
Method to unarchive the modelmonitor
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
Outputs*
a dictionary object with response status
"""
return super().modelmonitor_archive(id, archive=False)
[docs] def modelmonitor_start(self, id=None, wait_for_completion=True):
"""
Method to start the modelmonitor
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
wait_for_completion
When set to :bash:`True` this method will wait for modelmonitor resource to get into one of the complete state.
modelmonitor is declared complete if it is one of the :bash:`init/ready/error` state , when it reaches ready state, it starts the modelmonitor
Outputs*
a dictionary object with response status
"""
response = super().modelmonitor_state(id, "start")
while wait_for_completion:
mm_state = self.modelmonitor_get(id=id)["status"]["state"]
if mm_state.lower() in ["init", "active", "error"]:
break
else:
print("ModelMonitor {} - is in {} state".format(id, mm_state))
time.sleep(self.wait_interval)
return response
[docs] def modelmonitor_stop(self, id=None):
"""
Method to stop the modelmonitor
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
Outputs*
a dictionary object with response status
"""
return super().modelmonitor_state(id, "stop")
[docs] def modelmonitor_update_dataset(
self,
id=None,
data_class: DatasetClass = None,
data: DkubeModelmonitordataset = None,
wait_for_completion=True,
):
"""
Method to update the modelmonitor dataset
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
data
Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitordataset` class.
Please see the :bash:`Resources` section for details on this class.
data_class
Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DatasetClass` class.
Enum = ["TrainData","PredictData","LabelledData"]
Please see the :bash:`Resources` section for details on this class.
wait_for_completion
When set to :bash:`True` this method will wait for modelmonitor resource to get into one of the complete state and then update the datasets
modelmonitor is declared complete if it is one of the :bash:`init/ready/error` state , if it is in active state, modelmonitor update to datasets not allowed
Outputs*
a dictionary object with response status
"""
data_id = self.modelmonitor_get_datasets(id, data_class=data_class)["id"]
data_dict = json.loads(data.to_JSON())
for k in list(data_dict.keys()):
if data_dict[k] == None and k != "_class":
del data_dict[k]
data_dict["class"] = data_dict["_class"]
if data_dict["class"] == None:
data_dict["class"] = data_class
response = super().update_modelmonitor_dataset(id, data_id, data_dict)
while wait_for_completion:
mm_state = self.modelmonitor_get(id=id)["status"]["state"]
if mm_state.lower() in ["init", "error", "ready"]:
break
else:
print("ModelMonitor {} - is in {} state".format(id, mm_state))
time.sleep(self.wait_interval)
return response
[docs] def modelmonitor_update_alert(
self, id=None, alert: DkubeModelmonitoralert = None, alert_id=None
):
"""
Method to update the modelmonitor alert
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
data
Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitoralert` class.
Please see the :bash:`Resources` section for details on this class.
alert_id
ID of the alert you want to update in the modelmonitor
Outputs*
a dictionary object with response status
"""
alert_dict = json.loads(alert.to_JSON())
alert_dict["class"] = alert_dict.pop("_class")
return super().update_modelmonitor_alert(id, alert_id, alert_dict)
[docs] def modelmonitor_update(
self, id=None, config: DkubeModelmonitor = None, wait_for_completion=True
):
"""
Method to update the modelmonitor
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
config
Instance of :bash:`dkube.sdk.rsrcs.modelmonitor.DkubeModelmonitor` class.
Please see the :bash:`Resources` section for details on this class to check
what can be updated.
Outputs*
a dictionary object with response status
"""
config_dict = config.__dict__["modelmonitor"].__dict__
config_dict = {k.replace("_", "", 1): v for k, v in config_dict.items()}
rem_list = [
"datasets",
"model",
"alerts",
"performance_metrics_template",
"updated_at",
"id",
"drift_detection_algorithm",
"created_at",
"pipeline_component",
"status",
"owner",
"name",
"discriminator",
]
[config_dict.pop(key) for key in rem_list]
for k in list(config_dict.keys()):
if config_dict[k] == None or config_dict[k] == []:
del config_dict[k]
response = super().update_modelmonitor_config(id, config_dict)
while wait_for_completion:
mm_state = self.modelmonitor_get(id=id)["status"]["state"]
if mm_state.lower() in ["init", "error", "ready"]:
break
else:
print("ModelMonitor {} - is in {} state".format(id, mm_state))
time.sleep(self.wait_interval)
return response
[docs] def modelmonitor_update_schema(
self,
id=None,
label=None,
selected=True,
schema_class="Categorical",
schema_type="InputFeature",
):
"""
Method to update the schema in the modelmonitor
*Available in DKube Release: 3.0*
*Inputs*
id
Modelmonitor Id
label
feature in the schema to be updated
selected
boolean (True or False), by default True
schema_class
class of the schema (Categorical,Continuous)
schema_type
type of schema (Input Feature, PredictionOutput, Rowid, Timestamp)
Outputs*
a dictionary object with response status
"""
if not label:
print("Specify a valid column label")
return None
found = False
config = self.modelmonitor_get(id=id)
try:
for feature in config["schema"]["features"]:
if feature["label"] == label:
feature["_class"] = schema_class
feature["type"] = schema_type
feature["selected"] = selected
found = True
if not found:
print("specified label is not in the derived schema")
return None
for d in config["schema"]["features"]:
d["class"] = d.pop("_class")
mm = DkubeModelmonitor(
model_name=config["model"], description=config["description"]
)
mm.__dict__["modelmonitor"].__dict__["_schema"] = config["schema"]
return self.modelmonitor_update(id,mm)
except TypeError:
print("Schema is Null")
return