from __future__ import print_function
import json
import sys
import time
from enum import Enum
from pprint import pprint
from dkube.sdk.internal import dkube_api
from dkube.sdk.internal.dkube_api.models.modelmonitor_alert_cond_def import (
ModelmonitorAlertCondDef,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_alert_def import (
ModelmonitorAlertDef,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_component_def import (
ModelmonitorComponentDef,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_dataset_def import (
ModelmonitorDatasetDef,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_def import ModelmonitorDef
from dkube.sdk.internal.dkube_api.models.modelmonitor_default_threshold_def import (
ModelmonitorDefaultThresholdDef,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_features_spec_def import (
ModelmonitorFeaturesSpecDef,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_schema_feature import (
ModelmonitorSchemaFeature,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_status_def import (
ModelmonitorStatusDef,
)
from .util import *
[docs]class DatasetClass(Enum):
"""
This Enum class defines the dataset class that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.0*
"""
TrainData = "TrainData"
PredictData = "PredictData"
LabelledData = "LabelledData"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class ModelFrameworks(Enum):
"""
This Enum class defines the frameworks that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.0*
"""
Tensorflow1x = "Tensorflow-1x"
Tensorflow2x = "Tensorflow-2x"
PyTorch = "PyTorch"
SkLearn = "SkLearn"
Other = "Other"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class AlertClass(Enum):
"""
This Enum class defines the alert class that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.0*
"""
FeatureDrift = "FeatureDrift"
PerformanceDecay = "PerformanceDecay"
PredictionDrift = "PredictionDrift"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class ModelType(Enum):
"""
This Enum class defines the model type that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.0*
"""
Regression = "Regression"
Classification = "Classification"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class ModelCategory(Enum):
"""
This Enum class defines the category of the model that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.0*
"""
AutoEncoder = "AutoEncoder"
TimeSeries = "TimeSeries"
Other = "Other"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class DriftAlgo(Enum):
"""
This Enum class defines the drift detection algorithms that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.0*
"""
KS = "Kolmogorov-Smirnov"
ChiSquared = "Chi Squared"
KSChiSquared = "Kolmogorov-Smirnov & Chi Squared "
Auto = "Auto"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class DkubeModelmonitor(object):
"""
This class defines the DKube Modelmonitor with helper functions to set properties of modelmonitor.::
from dkube.sdk import *
mm = DkubeModelmonitor(name="mm",model_name='insurancemodel:ocdkube')
Where first argument is the name of the modelmonitor
second argument is the name of the model that you want to monitor i.e. nameofmodel:user
user should be a valid onboarded user in dkube.
*Available in DKube Release: 3.0*
"""
def __init__(self, name=generate("mm"), model_name="", description="", tags=None):
self.schema = {}
self.default_thresholds = []
self.default_thresholds.append(
ModelmonitorDefaultThresholdDef(
id=None, type="performance_threshold", threshold=0, percent_threshold=0
)
)
self.train_metrics = None
self.datasets = []
self.alerts = []
self.modelmonitor = ModelmonitorDef(
id=None,
schema=None,
pipeline_component=None,
owner=None,
emails=None,
name=None,
description=None,
tags=None,
model=None,
version=None,
endpoint_url=None,
model_type=None,
model_framework=None,
model_category=None,
drift_detection_run_frequency_hrs=None,
drift_detection_algorithm=None,
performance_metrics_template=None,
default_thresholds=self.default_thresholds,
datasets=self.datasets,
alerts=self.alerts,
)
self.update_basic(name, model_name, description, tags)
[docs] def update_basic(self, name, model_name, description, tags):
"""
Method to update the attributes specified at creation. Description and tags can be updated. tags is a list of string values.
"""
tags = list_of_strs(tags)
self.name = name
self.modelmonitor.name = name
self.modelmonitor.description = description
self.modelmonitor.model = model_name
self.modelmonitor.tags = tags
return self
[docs] def update_modelmonitor(
self,
model_type: ModelType = None,
model_category: ModelCategory = None,
model_framework: ModelFrameworks = None,
version=None,
run_freq=None,
drift_algo: DriftAlgo = None,
emails=None,
train_metrics=None,
):
"""
This function updates the DKube Modelmonitor configuration. The following updates are supported:
model type,
model category,
model framework,
drift detection algorithm,
run frequency,
train metrics,
emails,
model version.
"""
if model_type == None:
self.update_model_type("Regression")
else:
self.update_model_type(model_type)
if model_category == None:
self.update_model_category("TimeSeries")
else:
self.update_model_category(model_category)
if model_framework == None:
self.update_model_framework("Tensorflow-1x")
else:
self.update_model_framework(model_framework)
if drift_algo == None:
self.update_drift_detection_algorithm("Kolmogorov-Smirnov & Chi Squared")
else:
self.update_drift_detection_algorithm(drift_algo)
if run_freq == None:
self.update_run_frequency(1)
else:
self.update_run_frequency(run_freq)
if train_metrics:
self.update_train_metrics(train_metrics)
if version:
self.update_model_version(version)
if emails:
self.update_emails(emails)
[docs] def update_model_type(self, model_type=None):
"""
Method to update the type of the model, check the Enum Class ModelType for the posible values
"""
self.modelmonitor.model_type = model_type
[docs] def update_model_category(self, category=None):
"""
Method to update the category of the model, check the Enum Class ModelCategory for the possible values
"""
self.modelmonitor.model_category = category
[docs] def update_model_framework(self, framework=None):
"""
Method to update the framework, check the Enum Class ModelFramework for the possible values
"""
self.modelmonitor.model_framework = framework
[docs] def update_drift_detection_algorithm(self, algorithm=None):
"""
Method to update the drift detection Algorithm, check the Enum Class DriftAlgo for the possible values
"""
self.modelmonitor.drift_detection_algorithm = algorithm
[docs] def update_run_frequency(self, frequency=None):
"""
Method to update the frequency
"""
self.modelmonitor.drift_detection_run_frequency_hrs = frequency
[docs] def update_train_merics(self, train_metrics=None):
"""
Method to update the train metrics
"""
self.modelmonitor.train_metrics = train_metrics
[docs] def update_model_version(self, version=None):
"""
Method to update the model version
"""
self.modelmonitor.version = version
[docs] def update_emails(self, emails=None):
"""
Method to update the emails
"""
self.modelmonitor.emails = emails
[docs] def add_dataset(
self,
name,
data_class: DatasetClass = None,
data_format: DatasetFormat = "csv",
version=None,
s3_subpath=None,
gt_col=None,
predict_col=None,
sql_query=None,
):
"""
This function is for adding the dataset in the model monitor.
name : name of the dataset,
data class : see the Enum DatasetClass,
format of the dataset: see the Enum class DatasetFormat,
version : version of the dataset,
s3_subpath : s3_subpath of the dataset,
gt_col: groundtruth column of Labelled Dataset,
predict_col : predict column of Predict dataset,
sql query in case of sql dataset.
"""
mm_dataset = ModelmonitorDatasetDef(
id=None,
_class=data_class,
transformer_script=None,
name=name,
sql_query=sql_query,
s3_subpath=s3_subpath,
version=version,
data_format=data_format,
groundtruth_col=gt_col,
predict_col=predict_col,
created_at=None,
updated_at=None,
)
self.modelmonitor.datasets.append(mm_dataset)
[docs] def add_alert(self, name, alert_class, feature=None, threshold=None):
"""
This function is for adding the alert in the model monitor after adding train and predict datasets.
name : name of the alert,
alert_class : see the Enum AlertClass,
feature: name of the feature for which alert needs to be added
threshold : threshold for the feature
"""
self.conditions = []
self.conditions.append(
ModelmonitorAlertCondDef(feature=feature, op=">", threshold=threshold)
)
mm_alert = ModelmonitorAlertDef(
_class=alert_class, email=None, name=name, conditions=self.conditions
)
self.modelmonitor.alerts.append(mm_alert)
def to_JSON(self):
return json.dumps(self, default=lambda o: o.__dict__)
[docs]class DkubeModelmonitordataset(object):
"""
This class defines the DKube Modelmonitor dataset with helper functions to set properties of modelmonitor dataset.::
from dkube.sdk import *
mm = DkubeModelmonitordataset(name="mm-data:ocdkube")
Where first argument is the name of the modelmonitor dataset.
*Available in DKube Release: 3.0*
"""
def __init__(self, name=generate("mm-data")):
self._class = None
self.transformer_script = None
self.name = name
self.sql_query = None
self.s3_subpath = None
self.version = None
self.data_format = None
self.groundtruth_col = None
self.predict_col = None
def to_JSON(self):
return json.dumps(self, default=lambda o: o.__dict__)
[docs] def update_dataset(
self,
data_name=None,
data_class: DatasetClass = None,
data_format: DatasetFormat = "csv",
transformer_script=None,
sql_query=None,
groundtruth_col=None,
predict_col=None,
s3_subpath=None,
version=None,
):
"""
This function updates the DKube Modelmonitor dataset. The following updates are supported:
data class,
transformer script,
sql query in case of sql dataset,
groundtruth column of Labelled Dataset,
predict column of Predict dataset,
format of the dataset.
"""
if data_name:
self.update_data_name(data_name)
if data_class:
self.update_data_class(data_class)
if transformer_script:
self.update_transformer_script(transformer_script)
if sql_query:
self.update_sql_query(sql_query)
if groundtruth_col:
self.update_groundtruth_col(groundtruth_col)
if predict_col:
self.update_predict_col(predict_col)
if data_format == None:
self.update_data_format("csv")
else:
self.update_data_format(data_format)
if s3_subpath:
self.update_s3_subpath(s3_subpath)
if version:
self.update_dataset_version(version)
[docs] def update_data_name(self, data_name=None):
"""
Method to update the dataset name in the model monitor
"""
self.name = data_name
[docs] def update_data_class(self, data_class=None):
"""
Method to update the class of the dataset
"""
self._class = data_class
[docs] def update_sql_query(self, sql_query=None):
"""
Method to update sql query in case of sql dataset
"""
self.sql_query = sql_query
[docs] def update_groundtruth_col(self, groundtruth_col=None):
"""
Method to update the groundtruth column for Labelled Dataset
"""
self.groundtruth_col = groundtruth_col
[docs] def update_predict_col(self, predict_col=None):
"""
Method to update the predict column for the Predict Dataset
"""
self.predict_col = predict_col
[docs] def update_s3_subpath(self, path=None):
"""
Method to update s3 subpath of the dataset
"""
self.s3_subpath = path
[docs] def update_dataset_version(self, version=None):
"""
Method to update the dataset version
"""
self.version = version
[docs]class DkubeModelmonitoralert(object):
"""
This class defines the DKube Modelmonitor alert with helper functions to set properties of modelmonitor alert.::
from dkube.sdk import *
mm = DkubeModelmonitoralert(name="mm-alert")
Where first argument is the name of the alert in the modelmonitor .
*Available in DKube Release: 3.0*
"""
def __init__(self, name="mm-alert"):
self.id = None
self._class = None
self.name = name
self.email = None
self.conditions = []
def to_JSON(self):
return json.dumps(self, default=lambda o: o.__dict__)
[docs] def update_alert(
self,
email=None,
alert_class: AlertClass = "FeatureDrift",
feature=None,
metric=None,
threshold=None,
percent_threshold=None,
):
"""
This function updates the alert in the model monitor. The following updates are supported.
email,
alert_class,
feature,
metric,
threshold,
percent_threshold
"""
self.id = None
self.name = self.name
self._class = alert_class
self.email = email
self.conditions.append(
{
"id": None,
"feature": feature,
"metric": metric,
"op": ">",
"threshold": threshold,
"percent_threshold": percent_threshold,
}
)