Source code for dkube.sdk.rsrcs.featureset

from __future__ import print_function

import json
import os

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from dkube.sdk.internal import dkube_api
from dkube.sdk.internal.dkube_api.models.feature_set_input_def import \
from import ApiException

from .util import *

[docs]class DkubeFeatureSet(object): """ This class defines the DKube featureset with helper functions to set properties of featureset.:: from dkube.sdk import * mnist = DkubeFeatureSet(name="mnist-fs") *Available in DKube Release: 2.2* """ def __init__(self, name=generate("featureset"), tags=None, description=None, path=None, config_file="/opt/dkube/conf/conf.json"): self.featureset = FeatureSetInputDef( name=None, tags=None, description=None) self.update_basic(name, description, tags, config_file) self.update_featurespec_file(path) def update_basic(self, name, description, tags, config_file): if name is not None: = name if description is not None: self.featureset.description = description if tags is not None: self.featureset.tags = tags self.CONFIG_FILE = config_file
[docs] def update_featurespec_file(self, path=None): """ Method to update the filepath for feature specification metadata *Inputs* path A valid filepath. The file should be an YAML file describing a 'Name', 'Description', 'Schema' for each feature. """ self.featurespec_path = path
def upload_featurespec(self): pass
[docs] @classmethod def read_features(cls, path): """ Method to read features from the specified path *Inputs* path A valid filepath. *Outputs* df features DataFrame object """ assert(path), "path should be specified" df, _ = DKubeFeatureSetUtils().features_read(name=None, path=path) return df
[docs] @classmethod def write_features(cls, df, path): """ Method to write features at the specified path *Inputs* df features DataFrame object path A valid filepath. """ assert(not df.empty and path), "Both df and path should be specified" DKubeFeatureSetUtils().features_write(name=None, dataframe=df, path=path)
class DKubeFeatureSetUtils: def validate_features(self, dataframe=None, featurespec=None) -> bool: """ Method to validate features data against features specification metadata *Inputs* dataframe Panda's dataframe object with features data. This should confirm to the feature specification metadata. featurespec Dictionary """ if featurespec is None or dataframe is None: return False f_spec = featurespec # Parse featurespec. # - Create a list of feature names # - Create a map of feature name and schema fspec_dic = {} fspec_keys = [] for each_spec in f_spec: fspec_dic[str(each_spec['name'])] = str(each_spec['schema']) fspec_keys.append(str(each_spec['name'])) # Get the feature names and schema from the dataframe schema = [str(sma) for sma in dataframe.dtypes.to_list()] df_keys = [str(k) for k in dataframe.keys()] df_spec = {} for i in range(len(df_keys)): df_spec[df_keys[i]] = schema[i] # Validations # - The number of features should be the same # - The feature names should be the same # - The feature schema should be the same. if len(fspec_keys) != len(df_keys): # "No. of columns in dataframe and featurespec are not equal" return False for each_key in df_keys: if each_key not in fspec_keys: # error": "Column name {} not found in featurespec".format(each_key) return False if df_spec[each_key] != fspec_dic[each_key]: # Datatype not matched for column {}".format(each_key)} return False return True def compute_features_metadata(self, df, fspec): # Prepare featurespec - Name, Description, Schema for each feature keys = df.keys() if (len(fspec) == len(keys)): return fspec added_names = [] for each_spec in fspec: added_names.append(each_spec["name"]) schema = df.dtypes.to_list() featureset_metadata = [] for i in range(len(keys)): if str(keys[i]) not in added_names: metadata = {} metadata["name"] = str(keys[i]) metadata["description"] = None metadata["schema"] = str(schema[i]) featureset_metadata.append(metadata) return featureset_metadata + fspec # Convert featureset metadata (featurespec) to yaml # featureset_metadata = yaml.dump(featureset_metadata, default_flow_style=False) # return the mounted path for the featureset def _get_featureset_mount_path(self, name, config, type): # name - featureset name # config - config.json in dict format # type - search in 'outputs' or 'inputs' object = config.get(type, None) if object is None: return None for rec in object: fsets = rec.get('featureset', None) if fsets is None: continue for fset in fsets: if name == fset['name']: return fset['location'] return None # return the mounted featureset name, given the mount point def _get_featureset_name(self, path, config, type): # path - featureset mount path # config - config.json in dict format # type - search in 'outputs' or 'inputs' object = config.get(type, None) if object is None: return None for rec in object: fsets = rec.get('featureset', None) if fsets is None: continue for fset in fsets: if path == fset['location'] or path == fset['dkube_path']: return fset['name'] return None def _get_d3_full_path(self, rel_path): # Get full path to dkube store rel_path = rel_path.replace('users', 'home', 1) base = os.getenv("DKUBE_DATA_BASE_PATH") fullpath = os.path.join(base, rel_path) return (fullpath) ''' def _get_d3_path_from_mountpoint(self, path): # For the following df output, it returns featuresets/train-fs-1936/1610480719061/data # /fset src = None for l in open("/proc/mounts", "r"): tabs = l.split(" ") if(path == tabs[1]): src = tabs[0] break if src is not None: mnt_fields = src.split(":") if len(mnt_fields) > 1: src = mnt_fields[1] else: src = mnt_fields[0] src = os.path.relpath(src, "/dkube") return src ''' def _get_d3_rel_path(self, path, config, type): # path - featureset mount path # config - config.json in dict format # type - search in 'outputs' or 'inputs' object = config.get(type, None) if object is None: return None path = path.rstrip("/") for rec in object: fsets = rec.get('featureset', None) if fsets is None: continue for fset in fsets: if path == fset['location'].rstrip("/") or path == fset['dkube_path'].rstrip("/"): return os.path.relpath(fset['storage_path'], "dkube") return None def get_rel_path_for_commit(self, full_path): # Don't go by mount path. Check config.json try: if os.path.exists("/etc/dkube/config.json"): with open("/etc/dkube/config.json") as fp: dkube_config = json.load(fp) relpath = self._get_d3_rel_path(full_path, dkube_config, 'outputs') except: relpath = None # This might have been created if relpath is None: # Get relative path from dkube store base = os.getenv("DKUBE_DATA_BASE_PATH") relpath = os.path.relpath(full_path, base) relpath = relpath.replace('home', 'users', 1) return (relpath) def get_top_version(self, versions): # Get the latest version # Shortcut for now, get the length of the list and return it top = len(versions) return ('v{}'.format(top)) def validate_version(self, versions, version): # Does the version specified exists? index = int(version[1:]) if index >= 1 and index <= len(versions): return True return False def get_version_status(self, versions, version): # Get the state of the specified version index = int(version[1:]) if index < 1 and index > len(versions): return 'INVALID' versions = sorted(versions, key=lambda k: k['version'].get('index', 0), reverse=False) return versions[index-1]['version']['state'] def get_featureset_name_from_mountpath(self, path, type): try: if os.path.exists("/etc/dkube/config.json"): with open("/etc/dkube/config.json") as fp: dkube_config = json.load(fp) name = self._get_featureset_name(path, dkube_config, type) except: name = None return name def get_featureset_mountpath_from_name(self, name, type): path = None try: if os.path.exists("/etc/dkube/config.json"): with open("/etc/dkube/config.json") as fp: dkube_config = json.load(fp) path = self._get_featureset_mount_path(name, dkube_config, type) except: path = None return path def features_write(self, name, dataframe, path=None, dftype="Py") -> str: """ Method to write features *Inputs* name Featureset name dataframe Panda's dataframe object with features data. This should confirm to the feature specification metadata. path This is optional. If not specified it derives the path from /etc/dkube/config.json. *Outputs* path - where the features are written """ filename = 'featureset.parquet' if path is None: assert(name) # Get the path try: if os.path.exists("/etc/dkube/config.json"): with open("/etc/dkube/config.json") as fp: dkube_config = json.load(fp) path = self._get_featureset_mount_path(name, dkube_config, 'outputs') except: path = None if path is None: dkube_path = os.getenv('DKUBE_USER_STORE') job_uuid=os.getenv("DKUBE_JOB_UUID") if dkube_path is None: return None featureset_folder = os.path.join('gen/outputs/', job_uuid, name) path = os.path.join(dkube_path, featureset_folder) os.makedirs(path, exist_ok=True) # update config.json # _update_featureset_path(name, dkube) # Try writing 2 times # After commit, the parquet file becomes read-only if (dftype == "Py") and (not dataframe.empty): for i in range(2): try: table = pa.Table.from_pandas(dataframe) pq.write_table(table, os.path.join(path, filename)) break except Exception as e: print("features_write: write failed {}".format(str(e))) if i == 1: return None # Get the path relative to DKube base path = self.get_rel_path_for_commit(path) return path def features_read(self, name, path=None) -> (pd.DataFrame, bool): """ Method to read features *Inputs* name featureset name path This is optional. It points to where featureset is mounted *Outputs* Dataframe with features """ filename = 'featureset.parquet' df_empty = pd.DataFrame({'A': []}) is_mounted = False if path is None: # Get the path try: if os.path.exists("/etc/dkube/config.json"): with open("/etc/dkube/config.json") as fp: dkube_config = json.load(fp) path = self._get_featureset_mount_path(name, dkube_config, 'inputs') if path is not None: is_mounted=True except: path = None if path is None: return df_empty, is_mounted try: table = pq.read_table(os.path.join(path, filename)) feature_df = table.to_pandas() return feature_df, is_mounted except Exception as e: return df_empty, is_mounted