from __future__ import print_function
import json
import sys
import time
from enum import Enum
from pprint import pprint
from dkube.sdk.internal import dkube_api
from dkube.sdk.internal.dkube_api.models.modelmonitor_alert_cond_def import (
ModelmonitorAlertCondDef,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_alert_def import (
ModelmonitorAlertDef,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_component_def import (
ModelmonitorComponentDef,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_data_source_def import (
ModelmonitorDataSourceDef,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_def import ModelmonitorDef
from dkube.sdk.internal.dkube_api.models.modelmonitor_features_spec_def import (
ModelmonitorFeaturesSpecDef,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_schema_feature import (
ModelmonitorSchemaFeature,
)
from dkube.sdk.internal.dkube_api.models.modelmonitor_status_def import (
ModelmonitorStatusDef,
)
from .util import *
[docs]class SchemaFeatureClass(Enum):
"""
This Enum class defines the feature classes that are suported for the Dkube modelmonitor schema.
*Available in DKube Release: 3.x*
"""
Categorical = "categorical"
Continuous = "continuous"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class SchemaFeatureType(Enum):
"""
This Enum class defines the feature type that are suported for the Dkube modelmonitor schema.
*Available in DKube Release: 3.x*
"""
InputFeature = "input_feature"
PredictionOutput = "prediction_output"
Timestamp = "timestamp"
RowId = "row_id"
Null = "null"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class PipelineComponentType(Enum):
"""
This Enum class defines the type that are suported for the Pipeline component of the Dkube modelmonitor.
*Available in DKube Release: 3.x*
"""
Baseline = "baseline"
DataDrift = "data_drift"
PerformanceDrift = "performance_drift"
DeploymentHealth = "deployment_health"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class Protocol(Enum):
"""
This Enum class defines the protocols that are suported for the metrics in deployment of the Dkube modelmonitor.
*Available in DKube Release: 3.x*
"""
tcp = "tcp"
http = "http"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class DriftAlgo(Enum):
"""
This Enum class defines the drift detection algorithms that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.0*
"""
KS = "kolmogorov-smirnov"
ChiSquared = "chi Squared"
KSChiSquared = "kolmogorov-smirnov & chi squared"
Auto = "auto"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class ModelType(Enum):
"""
This Enum class defines the model type that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.0*
"""
Regression = "regression"
Classification = "classification"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class SourceTypeDeployment(Enum):
"""
This Enum class defines the source type that are suported for the Dkube deployment monitoring.
*Available in DKube Release: 3.0*
"""
Metrics = "metrics"
Custom = "custom"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class DatasetClass(Enum):
"""
This Enum class defines the dataset class that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.x*
"""
Train = "train"
Predict = "predict"
Labelled = "labelled"
Metrics = "metrics"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class AlertClass(Enum):
"""
This Enum class defines the alert class that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.x*
"""
FeatureDrift = "feature_drift"
PerformanceDecay = "performance_decay"
DeploymentHealth = "deployment_health"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class AlertState(Enum):
"""
This Enum class defines the alert state that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.0*
"""
Enabled = "enabled"
Disabled = "disabled"
Invalid = "invalid"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class AlertActionType(Enum):
"""
This Enum class defines the alert action type that are suported for the Dkube modelmonitor.
*Available in DKube Release: 3.0*
"""
Email = "email"
def __repr__(self):
return self.value
def __str__(self):
return self.value
[docs]class DkubeModelmonitor(object):
"""
This class defines the DKube Modelmonitor with helper functions to set properties of modelmonitor.::
from dkube.sdk import *
mm = DkubeModelmonitor(name="mm",model_name='insurancemodel:ocdkube')
Where first argument is the name of the modelmonitor
second argument is the name of the model that you want to monitor i.e. nameofmodel:user
user should be a valid onboarded user in dkube.
*Available in DKube Release: 3.x
"""
def __init__(self, name=generate("mm"), model_type="regression"):
self.alerts = []
self.datasources = {}
self.features = []
self.metrics = {}
self.schema = {}
self.deployment_monitoring = {}
self.drift_monitoring = {}
self.performance_monitoring = {}
self.features.append(
ModelmonitorSchemaFeature(selected=None, _class=None, label=None, type=None)
)
self.status = ModelmonitorStatusDef(
state=None,
sub_state=None,
message=None,
code=None,
success=None,
schema_updated=None,
alerts_updated=None,
)
self.pipeline_component = ModelmonitorComponentDef(
run_id=None, status_info=None, status=None, type=None
)
self.modelmonitor = ModelmonitorDef(
id=None,
status=self.status,
schema=self.schema,
pipeline_component=self.pipeline_component,
deployment_monitoring=self.deployment_monitoring,
drift_monitoring=self.drift_monitoring,
performance_monitoring=self.performance_monitoring,
owner=None,
name=None,
thresholds=None,
model_type=None,
datasources=self.datasources,
alerts=self.alerts,
)
self.update_modelmonitor(name, model_type)
[docs] def update_modelmonitor(
self, name=None,
model_type: ModelType = None,
data_timezone=None,
input_data_type=None,
thresholds=None,
):
"""
Method to update the attributes specified at creation.
"""
if name:
self.modelmonitor.name = name
if data_timezone:
self.modelmonitor.data_timezone = data_timezone
if model_type in ["regression", "classification"]:
self.modelmonitor.model_type = model_type
else:
print(
"Please define the supported model types, regression or classification"
)
if input_data_type:
self.modelmonitor.input_data_type = input_data_type
if thresholds:
self.modelmonitor.thresholds = thresholds
return self
[docs] def add_datasources(
self,
id=None,
data_class: DatasetClass = None,
transformer_script=None,
name=None,
sql_query=None,
s3_subpath=None,
version=None,
data_format=str(DatasetFormat.Tabular),
groundtruth_col=None,
predict_col=None,
date_suffix=None,
timestamp_col=None,
):
"""
This function adds the datasource in dkube Model monitor.
"""
if data_class == None:
print("Please provide a class for which dataset needs to be added")
else:
mm_dataset = {}
if name:
mm_dataset["name"] = name
if id:
mm_dataset["id"] = id
if transformer_script:
mm_dataset["transformer_script"] = transformer_script
if data_format:
mm_dataset["data_format"] = data_format
if version:
mm_dataset["version"] = version
if s3_subpath:
mm_dataset["s3_subpath"] = s3_subpath
if groundtruth_col:
mm_dataset["groundtruth_col"] = groundtruth_col
if predict_col:
mm_dataset["predict_col"] = predict_col
if sql_query:
mm_dataset["sql_query"] = sql_query
if data_class:
mm_dataset["class"] = data_class
if date_suffix:
mm_dataset["date_suffix"] = date_suffix
if data_class == "labelled" and timestamp_col:
mm_dataset["timestamp_col"] = timestamp_col
if data_class not in self.modelmonitor.datasources:
self.modelmonitor.datasources[data_class] = mm_dataset
[docs] def update_datasources(
self,
id=None,
data_class: DatasetClass = None,
transformer_script=None,
name=None,
sql_query=None,
s3_subpath=None,
version=None,
data_format=str(DatasetFormat.Tabular),
groundtruth_col=None,
predict_col=None,
date_suffix=None,
timestamp_col=None,
):
"""
This function updates the DKube Modelmonitor datasource.
"""
if data_class == None:
print("Please provide a class for which dataset needs to be updated")
else:
mm_dataset = {}
if name:
mm_dataset["name"] = name
if id:
mm_dataset["id"] = id
if transformer_script:
mm_dataset["transformer_script"] = transformer_script
if data_format:
mm_dataset["data_format"] = data_format
if version:
mm_dataset["version"] = version
if s3_subpath:
mm_dataset["s3_subpath"] = s3_subpath
if groundtruth_col:
mm_dataset["groundtruth_col"] = groundtruth_col
if predict_col:
mm_dataset["predict_col"] = predict_col
if sql_query:
mm_dataset["sql_query"] = sql_query
if data_class:
mm_dataset["class"] = data_class
if date_suffix:
mm_dataset["date_suffix"] = date_suffix
if data_class == "labelled" and timestamp_col:
mm_dataset["timestamp_col"] = timestamp_col
for key in mm_dataset:
self.modelmonitor.datasources[data_class][key] = mm_dataset[key]
[docs] def update_drift_monitoring_details(
self,
enabled=None,
frequency=None,
algorithm: DriftAlgo = None,
):
"""
This function updates the DKube drift monitor details. The following updates are supported:
enabled : boolean value,
frequency : an integer, frequency for detecting concept drift
algorithm : Drift Algorithm, see the DriftAlgo Enum class for the details,
"""
if enabled:
self.modelmonitor.drift_monitoring["enabled"] = enabled
if frequency:
self.modelmonitor.drift_monitoring["frequency"] = frequency
if algorithm:
self.modelmonitor.drift_monitoring["algorithm"] = algorithm
[docs] def update_deployment_monitoring_details(
self,
enabled=None,
frequency=None,
cluster=None,
source_type: SourceTypeDeployment = None,
metrics=None,
collect_metrics=None,
):
"""
This function updates the DKube deployment monitoring details. The following updates are supported:
enabled : boolean value,
frequency : an integer, frequency for deployment monitoring
cluster: cluster
source_type: SourceType see the DeploymentSourceType Enum class for the details,
metrics:
thresholds : a dictionary containing hard and soft thresholds, eg : { hard:0.02, soft:0.01}
"""
if enabled:
self.modelmonitor.deployment_monitoring["enabled"] = enabled
if frequency:
self.modelmonitor.deployment_monitoring["frequency"] = frequency
if cluster:
self.modelmonitor.deployment_monitoring["cluster"] = cluster
if source_type:
self.modelmonitor.deployment_monitoring["source_type"] = source_type
if collect_metrics:
self.modelmonitor.deployment_monitoring["collect_metrics"] = collect_metrics
[docs] def update_deployment_metrics(
self,
heartbeat=None,
protocol: Protocol = None,
payload=None,
headers=None,
response_status_code=None,
response_body=None,
):
"""
Method to update deployment metrics in performance monitoring
"""
self.modelmonitor.deployment_monitoring["metrics"] = {
"heartbeat": heartbeat,
"protocol": protocol,
"payload": payload,
"headers": headers,
"response_status_code": response_status_code,
"response_body": response_body,
}
[docs]class DkubeModelmonitoralert(object):
"""
This class defines the DKube Modelmonitor alert with helper functions to set properties of modelmonitor alert.::
from dkube.sdk import *
mm = DkubeModelmonitoralert(name="mm-alert")
Where first argument is the name of the alert in the modelmonitor .
*Available in DKube Release: 3.x*
"""
def __init__(self, name="mm-alert", tags=[]):
self.id = None
self._class = None
self.enabled = None
self.name = name
self.tags = tags
self.conditions = []
self.alert_action = {}
self.emails=None
def to_JSON(self):
return json.dumps(self, default=lambda o: o.__dict__)
[docs] def update_alert(
self,
alert_class: AlertClass = "feature_drift",
enabled=None,
tags=None,
feature=None,
metric=None,
threshold=None,
percent_threshold=None,
breach_threshold=None,
emails=None,
action_type="email",
):
"""
This function updates the alert in the model monitor. The following updates are supported.
alert_class,
enabled,
tags,
feature,
metric,
threshold,
percent_threshold,
breach_threshold,
emails
"""
if tags:
self.tags = tags
self.name = self.name
self._class = alert_class
self.enabled = enabled
self.alert_action["action_type"] = "email"
self.conditions.append(
{
"id": None,
"feature": feature,
"metric": metric,
"op": ">",
"threshold": threshold,
"percent_threshold": percent_threshold,
}
)
if breach_threshold:
self.alert_action["breach_threshold"] = breach_threshold
if emails:
self.alert_action["emails"] = emails