from __future__ import print_function
import json
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from dkube.sdk.internal import dkube_api
from dkube.sdk.internal.dkube_api.models.feature_set_input_def import \
from import ApiException
from .util import *
[docs]class DkubeFeatureSet(object):
This class defines the DKube featureset with helper functions to set properties of featureset.::
from dkube.sdk import *
mnist = DkubeFeatureSet(name="mnist-fs")
*Available in DKube Release: 2.2*
def __init__(self, name=generate("featureset"), tags=None, description=None, path=None, config_file="/opt/dkube/conf/conf.json"):
self.featureset = FeatureSetInputDef(
name=None, tags=None, description=None)
self.update_basic(name, description, tags, config_file)
def update_basic(self, name, description, tags, config_file):
if name is not None: = name
if description is not None:
self.featureset.description = description
if tags is not None:
self.featureset.tags = tags
self.CONFIG_FILE = config_file
[docs] def update_featurespec_file(self, path=None):
Method to update the filepath for feature specification metadata
A valid filepath. The file should be an YAML file describing a 'Name', 'Description', 'Schema' for each feature.
self.featurespec_path = path
def upload_featurespec(self):
[docs] @classmethod
def read_features(cls, path):
Method to read features from the specified path
A valid filepath.
features DataFrame object
assert(path), "path should be specified"
df, _ = DKubeFeatureSetUtils().features_read(name=None, path=path)
return df
[docs] @classmethod
def write_features(cls, df, path):
Method to write features at the specified path
features DataFrame object
A valid filepath.
assert(not df.empty and path), "Both df and path should be specified"
DKubeFeatureSetUtils().features_write(name=None, dataframe=df, path=path)
class DKubeFeatureSetUtils:
def validate_features(self, dataframe=None, featurespec=None) -> bool:
Method to validate features data against features specification metadata
Panda's dataframe object with features data. This should confirm to the feature specification metadata.
if featurespec is None or dataframe is None:
return False
f_spec = featurespec
# Parse featurespec.
# - Create a list of feature names
# - Create a map of feature name and schema
fspec_dic = {}
fspec_keys = []
for each_spec in f_spec:
fspec_dic[str(each_spec['name'])] = str(each_spec['schema'])
# Get the feature names and schema from the dataframe
schema = [str(sma) for sma in dataframe.dtypes.to_list()]
df_keys = [str(k) for k in dataframe.keys()]
df_spec = {}
for i in range(len(df_keys)):
df_spec[df_keys[i]] = schema[i]
# Validations
# - The number of features should be the same
# - The feature names should be the same
# - The feature schema should be the same.
if len(fspec_keys) != len(df_keys):
# "No. of columns in dataframe and featurespec are not equal"
return False
for each_key in df_keys:
if each_key not in fspec_keys:
# error": "Column name {} not found in featurespec".format(each_key)
return False
if df_spec[each_key] != fspec_dic[each_key]:
# Datatype not matched for column {}".format(each_key)}
return False
return True
def compute_features_metadata(self, df, fspec):
# Prepare featurespec - Name, Description, Schema for each feature
keys = df.keys()
if (len(fspec) == len(keys)):
return fspec
added_names = []
for each_spec in fspec:
schema = df.dtypes.to_list()
featureset_metadata = []
for i in range(len(keys)):
if str(keys[i]) not in added_names:
metadata = {}
metadata["name"] = str(keys[i])
metadata["description"] = None
metadata["schema"] = str(schema[i])
return featureset_metadata + fspec
# Convert featureset metadata (featurespec) to yaml
# featureset_metadata = yaml.dump(featureset_metadata, default_flow_style=False)
# return the mounted path for the featureset
def _get_featureset_mount_path(self, name, config, type):
# name - featureset name
# config - config.json in dict format
# type - search in 'outputs' or 'inputs'
object = config.get(type, None)
if object is None:
return None
for rec in object:
fsets = rec.get('featureset', None)
if fsets is None:
for fset in fsets:
if name == fset['name']:
return fset['location']
return None
# return the mounted featureset name, given the mount point
def _get_featureset_name(self, path, config, type):
# path - featureset mount path
# config - config.json in dict format
# type - search in 'outputs' or 'inputs'
object = config.get(type, None)
if object is None:
return None
for rec in object:
fsets = rec.get('featureset', None)
if fsets is None:
for fset in fsets:
if path == fset['location'] or path == fset['dkube_path']:
return fset['name']
return None
def _get_d3_full_path(self, rel_path):
# Get full path to dkube store
rel_path = rel_path.replace('users', 'home', 1)
base = os.getenv("DKUBE_DATA_BASE_PATH")
fullpath = os.path.join(base, rel_path)
return (fullpath)
def _get_d3_path_from_mountpoint(self, path):
# For the following df output, it returns featuresets/train-fs-1936/1610480719061/data
# /fset
src = None
for l in open("/proc/mounts", "r"):
tabs = l.split(" ")
if(path == tabs[1]):
src = tabs[0]
if src is not None:
mnt_fields = src.split(":")
if len(mnt_fields) > 1:
src = mnt_fields[1]
src = mnt_fields[0]
src = os.path.relpath(src, "/dkube")
return src
def _get_d3_rel_path(self, path, config, type):
# path - featureset mount path
# config - config.json in dict format
# type - search in 'outputs' or 'inputs'
object = config.get(type, None)
if object is None:
return None
path = path.rstrip("/")
for rec in object:
fsets = rec.get('featureset', None)
if fsets is None:
for fset in fsets:
if path == fset['location'].rstrip("/") or path == fset['dkube_path'].rstrip("/"):
return os.path.relpath(fset['storage_path'], "dkube")
return None
def get_rel_path_for_commit(self, full_path):
# Don't go by mount path. Check config.json
if os.path.exists("/etc/dkube/config.json"):
with open("/etc/dkube/config.json") as fp:
dkube_config = json.load(fp)
relpath = self._get_d3_rel_path(full_path, dkube_config, 'outputs')
relpath = None
# This might have been created
if relpath is None:
# Get relative path from dkube store
base = os.getenv("DKUBE_DATA_BASE_PATH")
relpath = os.path.relpath(full_path, base)
relpath = relpath.replace('home', 'users', 1)
return (relpath)
def get_top_version(self, versions):
# Get the latest version
# Shortcut for now, get the length of the list and return it
top = len(versions)
return ('v{}'.format(top))
def validate_version(self, versions, version):
# Does the version specified exists?
index = int(version[1:])
if index >= 1 and index <= len(versions):
return True
return False
def get_version_status(self, versions, version):
# Get the state of the specified version
index = int(version[1:])
if index < 1 and index > len(versions):
return 'INVALID'
versions = sorted(versions, key=lambda k: k['version'].get('index', 0), reverse=False)
return versions[index-1]['version']['state']
def get_featureset_name_from_mountpath(self, path, type):
if os.path.exists("/etc/dkube/config.json"):
with open("/etc/dkube/config.json") as fp:
dkube_config = json.load(fp)
name = self._get_featureset_name(path, dkube_config, type)
name = None
return name
def get_featureset_mountpath_from_name(self, name, type):
path = None
if os.path.exists("/etc/dkube/config.json"):
with open("/etc/dkube/config.json") as fp:
dkube_config = json.load(fp)
path = self._get_featureset_mount_path(name, dkube_config, type)
path = None
return path
def features_write(self, name, dataframe, path=None, dftype="Py") -> str:
Method to write features
Featureset name
Panda's dataframe object with features data. This should confirm to the feature specification metadata.
This is optional. If not specified it derives the path from /etc/dkube/config.json.
path - where the features are written
filename = 'featureset.parquet'
if path is None:
# Get the path
if os.path.exists("/etc/dkube/config.json"):
with open("/etc/dkube/config.json") as fp:
dkube_config = json.load(fp)
path = self._get_featureset_mount_path(name, dkube_config, 'outputs')
path = None
if path is None:
dkube_path = os.getenv('DKUBE_USER_STORE')
if dkube_path is None:
return None
featureset_folder = os.path.join('gen/outputs/', job_uuid, name)
path = os.path.join(dkube_path, featureset_folder)
os.makedirs(path, exist_ok=True)
# update config.json
# _update_featureset_path(name, dkube)
# Try writing 2 times
# After commit, the parquet file becomes read-only
if (dftype == "Py") and (not dataframe.empty):
for i in range(2):
table = pa.Table.from_pandas(dataframe)
pq.write_table(table, os.path.join(path, filename))
except Exception as e:
print("features_write: write failed {}".format(str(e)))
if i == 1:
return None
# Get the path relative to DKube base
path = self.get_rel_path_for_commit(path)
return path
def features_read(self, name, path=None) -> (pd.DataFrame, bool):
Method to read features
featureset name
This is optional. It points to where featureset is mounted
Dataframe with features
filename = 'featureset.parquet'
df_empty = pd.DataFrame({'A': []})
is_mounted = False
if path is None:
# Get the path
if os.path.exists("/etc/dkube/config.json"):
with open("/etc/dkube/config.json") as fp:
dkube_config = json.load(fp)
path = self._get_featureset_mount_path(name, dkube_config, 'inputs')
if path is not None:
path = None
if path is None:
return df_empty, is_mounted
table = pq.read_table(os.path.join(path, filename))
feature_df = table.to_pandas()
return feature_df, is_mounted
except Exception as e:
return df_empty, is_mounted