from abc import ABC, abstractmethod
import os
import sys
import h5py
import json
import numpy as np
import pandas as pd
import keras
from nnfwtbn.variable import Variable
from nnfwtbn.helpers import python_to_str, str_to_python
[docs]class CrossValidator(ABC):
"""
Abstract class of a cross validation method.
"""
[docs] def __init__(self, k, mod_var=None, frac_var=None):
"""
Creates a new cross validator. The argument k determines the number of
folders. The mod_var specifies a variable whose 'mod k' value defines
the set. The frac_var specifies a variable whose decimals defines the
set. Only one of the two can be used. Both options can be either a
string naming the column in the dataframe or a variable object.
"""
self.k = k
if (mod_var is None) == (frac_var is None):
raise TypeError("Exactly one of mod_var or frac_var must be "
"used.")
elif mod_var is not None:
self.variable = mod_var
self.mod_mode = True
else:
self.variable = frac_var
self.mod_mode = False
# Handle variable
if isinstance(self.variable, str):
self.variable = Variable(self.variable, self.variable)
[docs] def __eq__(self, other):
"""
Compare if two cross validators are the same.
"""
if not isinstance(other, self.__class__):
return False
if self.k != other.k:
return False
if self.mod_mode != other.mod_mode:
return False
if self.variable != other.variable:
return False
return True
[docs] @abstractmethod
def select_slice(self, df, slice_id):
"""
Returns the index array to select all events from the dataset of a
given slice.
NB: This method is for internal usage only. There might be more than k
slices.
"""
[docs] @abstractmethod
def select_training(self, df, fold_i):
"""
Returns the index array to select all training events from the dataset for the
given fold.
"""
[docs] @abstractmethod
def select_validation(self, df, fold_i):
"""
Returns the index array to select all validation events from the dataset for the
given fold.
"""
[docs] @abstractmethod
def select_test(self, df, fold_i):
"""
Returns the index array to select all test events from the dataset for the
given fold.
"""
[docs] def select_cv_set(self, df, cv, fold_i):
"""
Returns the index array to select all events from the cross validator
set specified with cv ('train', 'val', 'test') for the given fold.
"""
if cv not in ['train', 'val', 'test']:
raise ValueError("Argument 'cv' must be one of 'train', 'val', "
"'test', 'all'; but was %s." % repr(cv))
if cv == "train":
selected = self.select_training(df, fold_i)
elif cv == "val":
selected = self.select_validation(df, fold_i)
else:
selected = self.select_test(df, fold_i)
return selected
[docs] def retrieve_fold_info(self, df, cv):
"""
Returns and array of integers to specify which event was used
for train/val/test in which fold
"""
fold_info = np.zeros(len(df), dtype='bool') - 1
for fold_i in range(self.k):
selected = self.select_cv_set(df, cv, fold_i)
fold_info[selected] = fold_i
return fold_info
[docs] def save_to_h5(self, path, key, overwrite=False):
"""
Save cross validator definition to a hdf5 file.
'path' is the file path and 'key' is the path inside the hdf5 file.
If overwrite is true then already existing file contents are overwritten.
"""
if overwrite:
open_mode = "w"
else:
open_mode = "a"
with h5py.File(path, open_mode) as output_file:
group = output_file.create_group(os.path.join(key))
group.attrs["class_name"] = np.string_(self.__class__.__name__)
group.attrs["k"] = self.k
group.attrs["mod_mode"] = self.mod_mode
self.variable.save_to_h5(path, os.path.join(key, "variable"))
[docs] @classmethod
def load_from_h5(cls, path, key):
"""
Create a new cross validator instance from an hdf5 file.
'path' is the file path and 'key' is the path inside the hdf5 file.
"""
with h5py.File(path, "r") as input_file:
class_name = input_file[key].attrs["class_name"].decode()
class_object = getattr(sys.modules[__name__], class_name)
k = input_file[key].attrs["k"]
mod_mode = input_file[key].attrs["mod_mode"]
variable = Variable.load_from_h5(path, os.path.join(key, "variable"))
if mod_mode:
return class_object(k=k, mod_var=variable)
else:
return class_object(k=k, frac_var=variable)
[docs]class ClassicalCV(CrossValidator):
"""
Performs the k-fold cross validation on half of the data set. The other
half is designated as the test set.
fold 0: | Tr | Tr | Tr | Tr | Va | Test |
fold 1: | Tr | Tr | Tr | Va | Tr | Test |
fold 2: | Tr | Tr | Va | Tr | Tr | Test |
fold 3: | Tr | Va | Tr | Tr | Tr | Test |
fold 4: | Va | Tr | Tr | Tr | Tr | Test |
Va=Validation, Tr=Training
"""
[docs] def select_slice(self, df, slice_id):
"""
Returns the index array to select all events from the dataset of a
given slice.
NB: This method is for internal usage only. There might be more than k
slices.
"""
if self.mod_mode:
return (self.variable(df) % (self.k * 2) == slice_id)
else:
variable = self.variable(df) % 1
return (slice_id / (self.k * 2.0) <= variable) \
& (variable < (slice_id + 1.0) / (self.k * 2))
[docs] def select_training(self, df, fold_i):
"""
Returns the index array to select all training events from the dataset for the
given fold.
"""
selected = np.zeros(len(df), dtype='bool')
for slice_i in range(self.k):
if (slice_i + fold_i) % self.k == self.k - 1:
continue
selected = selected | self.select_slice(df, slice_i)
return selected
[docs] def select_validation(self, df, fold_i):
"""
Returns the index array to select all validation events from the dataset for the
given fold.
"""
return self.select_slice(df, (self.k - fold_i - 1) % self.k)
[docs] def select_test(self, df, fold_i):
"""
Returns the index array to select all test events from the dataset for the
given fold.
"""
selected = np.zeros(len(df), dtype='bool')
for slice_i in range(self.k, self.k * 2):
selected = selected | self.select_slice(df, slice_i)
return selected
[docs]class NoTestCV(CrossValidator):
"""
Uses the whole dataset for training and validation with a single fold. The
test set is empty.
fold 0: | Training | Val |
The NoTestCV can be useful if the test dataset is provided independently
from the training and validation, for example if a different generator is
used for the training or if real-time (non-hep) data is used as a "test"
set.
"""
[docs] def __init__(self, mod_var=None, frac_var=None, k=10):
"""
The parameter k defines the inverse fraction of the validation set.
For example, k=5 will allocate 1/5 = 20% of the dataset for validation.
"""
super().__init__(k, mod_var=mod_var, frac_var=frac_var)
[docs] def select_slice(self, df, slice_id):
"""
Returns the index array to select all events from the dataset of a
given slice.
NB: This method is for internal usage only. There might be more than k
slices.
"""
if self.mod_mode:
return (self.variable(df) % self.k == slice_id)
else:
variable = self.variable(df) % 1
return (slice_id / self.k <= variable) \
& (variable < (slice_id + 1.0) / self.k)
[docs] def select_training(self, df, fold_i):
"""
Returns the index array to select all training events from the
dataset. The fold_i parameter has no effect.
"""
selected = np.zeros(len(df), dtype='bool')
for slice_i in range(1, self.k):
selected = selected | self.select_slice(df, slice_i)
return selected
[docs] def select_validation(self, df, fold_i):
"""
Returns the index array to select all validation events from the dataset for the
given fold.
"""
return self.select_slice(df, 0)
[docs] def select_test(self, df, fold_i):
"""
Returns the index array to select all test events from the dataset for the
given fold. The test set is empty.
"""
selected = np.zeros(len(df), dtype='bool')
return selected
[docs]class BinaryCV(CrossValidator):
"""
Defines a training set and a test set using a binary split. There is no
independent validation set in this case. The BinaryCV should not be used
for parameter optimization.
fold 0: | Training | Test & Val |
fold 1: | Test & Val | Training |
The BinaryCV can be used after parameter optimization with ClassicalCV to
retrain the model on the full half. The valiation performance contain in
HepNet.history is the test performance.
"""
[docs] def __init__(self, mod_var=None, frac_var=None, k=None):
"""
k is set to 2. The argument k has no effect.
"""
super().__init__(2, mod_var=mod_var, frac_var=frac_var)
[docs] def select_slice(self, df, slice_id):
"""
Returns the index array to select all events from the dataset of a
given slice.
NB: This method is for internal usage only. There might be more than k
slices.
"""
if self.mod_mode:
return (self.variable(df) % self.k == slice_id)
else:
variable = self.variable(df) % 1
return (slice_id / self.k <= variable) \
& (variable < (slice_id + 1.0) / self.k)
[docs] def select_training(self, df, fold_i):
"""
Returns the index array to select all training events from the dataset for the
given fold.
"""
return self.select_slice(df, fold_i)
[docs] def select_validation(self, df, fold_i):
"""
Returns the index array to select all validation events from the dataset for the
given fold.
"""
return self.select_slice(df, (1 + fold_i) % self.k)
[docs] def select_test(self, df, fold_i):
"""
Returns the index array to select all test events from the dataset for the
given fold.
"""
return self.select_slice(df, (1 + fold_i) % self.k)
[docs]class MixedCV(CrossValidator):
"""
Performs the k-fold cross validation where validation and test sets are
both interleaved.
fold 0: | Tr | Tr | Tr | Te | Va |
fold 1: | Tr | Tr | Te | Va | Tr |
fold 2: | Tr | Te | Va | Tr | Tr |
fold 3: | Te | Va | Tr | Tr | Tr |
fold 4: | Va | Tr | Tr | Tr | Te |
Va=Validation, Tr=Training, Te=Test
"""
[docs] def select_slice(self, df, slice_id):
"""
Returns the index array to select all events from the dataset of a
given slice.
NB: This method is for internal usage only. There might be more than k
slices.
"""
if self.mod_mode:
return (self.variable(df) % self.k == slice_id)
else:
variable = self.variable(df) % 1
return (slice_id / self.k <= variable) \
& (variable < (slice_id + 1.0) / self.k)
[docs] def select_training(self, df, fold_i):
"""
Returns the index array to select all training events from the dataset for the
given fold.
"""
selected = np.zeros(len(df), dtype='bool')
for slice_i in range(self.k):
if (slice_i + fold_i) % self.k == self.k - 1:
continue
if (slice_i + fold_i) % self.k == self.k - 2:
continue
selected = selected | self.select_slice(df, slice_i)
return selected
[docs] def select_validation(self, df, fold_i):
"""
Returns the index array to select all validation events from the dataset for the
given fold.
"""
return self.select_slice(df, (self.k - fold_i - 1) % self.k)
[docs] def select_test(self, df, fold_i):
"""
Returns the index array to select all test events from the dataset for the
given fold.
"""
return self.select_slice(df, (self.k - fold_i - 2) % self.k)
[docs]class Normalizer(ABC):
"""
Abstract normalizer which shift and scales the distribution such that it hash
zero mean and unit width.
"""
[docs] @abstractmethod
def __init__(self, df, input_list=None):
"""
Returns a normalizer object with the normalization moments stored
internally. The input_list argument specifies which inputs should be
normalized. All other columns are left untouched.
"""
[docs] @abstractmethod
def __call__(self, df):
"""
Applies the normalized of the input_columns to the given dataframe and
returns a normalized copy.
"""
[docs] @abstractmethod
def __eq__(self, other):
"""
Check if two normalizers are the same.
"""
@property
@abstractmethod
def scales(self):
"""
Every normalizor must reduce to a simple (offset + scale * x)
normalization to be used with lwtnn. This property returns the scale
parameters for all variables.
"""
@property
@abstractmethod
def offsets(self):
"""
Every normalizor must reduce to a simple (offset + scale * x)
normalization to be used with lwtnn. This property returns the offset
parameters for all variables.
"""
[docs] def save_to_h5(self, path, key, overwrite=False):
"""
Save normalizer definition to a hdf5 file.
'path' is the file path and 'key' is the path inside the hdf5 file.
If overwrite is true then already existing file contents are overwritten.
"""
if overwrite:
open_mode = "w"
else:
open_mode = "a"
with h5py.File(path, open_mode) as output_file:
group = output_file.create_group(os.path.join(key))
group.attrs["class_name"] = np.string_(self.__class__.__name__)
self._save_to_h5(path, key)
@abstractmethod
def _save_to_h5(self, path, key):
"""
Save child class specific definitions to a hdf5 file.
'path' is the file path and 'key' is the path inside the hdf5 file.
If overwrite is true then already existing file contents are overwritten.
"""
[docs] @classmethod
def load_from_h5(cls, path, key):
"""
Create a new normalizer instance from an hdf5 file.
'path' is the file path and 'key' is the path inside the hdf5 file.
"""
with h5py.File(path, "r") as input_file:
if key not in input_file:
return None
class_name = input_file[key].attrs["class_name"].decode()
class_object = getattr(sys.modules[__name__], class_name)
return class_object._load_from_h5(path, key)
@classmethod
@abstractmethod
def _load_from_h5(cls, path, key):
"""
Load child class specific definitions from a hdf5 file.
"""
[docs]class EstimatorNormalizer(Normalizer):
"""
Normalizer which uses estimators to compute the normalization moments.
This method might be lead to sub-optimal results if there are outliers.
"""
[docs] def __init__(self, df, input_list=None, center=None, width=None):
"""
See base class.
"""
if center is not None and width is not None:
self.center = center
self.width = width
else:
if input_list is not None:
df = df[input_list]
self.center = df.mean()
self.width = df.std()
self.width[self.width == 0] = 1
[docs] def __call__(self, df):
"""
See base class.
"""
input_list = list(self.center.index)
normed = (df[input_list] - self.center) / self.width
aux_list = [c for c in df.columns if c not in input_list]
normed[aux_list] = df[aux_list]
return normed
[docs] def __eq__(self, other):
"""
See base class.
"""
if not isinstance(other, self.__class__):
return False
if not self.center.equals(other.center):
return False
if not self.width.equals(other.width):
return False
return True
def _save_to_h5(self, path, key):
"""
See base class.
"""
self.center.to_hdf(path, key=os.path.join(key, "center"))
self.width.to_hdf(path, key=os.path.join(key, "width"))
@classmethod
def _load_from_h5(cls, path, key):
"""
See base class.
"""
center = pd.read_hdf(path, os.path.join(key, "center"))
width = pd.read_hdf(path, os.path.join(key, "width"))
return cls(None, center=center, width=width)
@property
def scales(self):
return 1 / self.width
@property
def offsets(self):
return -self.center / self. width
[docs]def normalize_category_weights(df, categories, weight='weight'):
"""
The categorical weight normalizer acts on the weight variable only. The
returned dataframe will satisfy the following conditions:
- The sum of weights of all events is equal to the total number of
entries.
- The sum of weights of a category is equal to the total number of entries
divided by the number of classes. Therefore the sum of weights of two
categories are equal.
- The relative weights within a category are unchanged.
"""
df_out = df[:]
w_norm = np.empty(len(df))
for category in categories:
idx = category.idx_array(df)
w_norm[idx] = df[idx][weight].sum()
df_out[weight] = df_out[weight] / w_norm * len(df) / len(categories)
return df_out
[docs]class HepNet:
"""
Meta model of a concrete neural network around the underlying Keras model.
The HEP net handles cross validation, normalization of the input
variables, the input weights, and the actual Keras model. A HEP net has no
free parameters.
"""
[docs] def __init__(self, keras_model, cross_validator, normalizer, input_list,
output_list):
"""
Creates a new HEP model. The keras model parameter must be a class that
returns a new instance of the compiled model (The HEP net needs to
able to create multiple models, one for each cross validation fold.)
The cross_validator must be a CrossValidator object.
The normalizer must be a Normalizer class that returns a normalizer. Each
cross_validation fold uses a separate normalizer with independent
normalization weights.
The input and output lists are lists of variables of column names used
as input and target of the keras model. The input is normalized.
"""
self.model_cls = keras_model
self.cv = cross_validator
self.norm_cls = normalizer
self.input_list = input_list
self.output_list = output_list
self.norms = []
self.models = []
self.history = pd.DataFrame()
[docs] def __eq__(self, other):
"""
Check if two models have the same configuration.
"""
if not isinstance(other, self.__class__):
return False
if python_to_str(self.model_cls) != python_to_str(other.model_cls):
return False
if self.cv != other.cv:
return False
if python_to_str(self.norm_cls) != python_to_str(other.norm_cls):
return False
if self.input_list != other.input_list:
return False
if self.output_list != other.output_list:
return False
if self.norms != other.norms:
return False
if (self.history != other.history).all().all():
return False
return True
[docs] def fit(self, df, weight=None, **kwds):
"""
Calls fit() on all folds. All kwds are passed to fit().
"""
if weight is None:
weight = Variable("unity", lambda d: np.ones(len(d)))
elif isinstance(weight, str):
weight = Variable(weight, weight)
### Loop over folds:
self.norms = []
self.models = []
self.history = pd.DataFrame()
for fold_i in range(self.cv.k):
# select training set
selected = self.cv.select_training(df, fold_i)
training_df = df[selected]
# select validation set
selected = self.cv.select_validation(df, fold_i)
validation_df = df[selected]
# seed normalizers
norm = self.norm_cls(training_df, self.input_list)
self.norms.append(norm)
training_df = norm(training_df)
validation_df = norm(validation_df)
# fit folds
model = self.model_cls()
self.models.append(model)
history = model.fit(training_df[self.input_list],
training_df[self.output_list],
validation_data=(
validation_df[self.input_list],
validation_df[self.output_list],
np.array(weight(validation_df)),
),
sample_weight=np.array(weight(training_df)),
**kwds)
history = history.history
history['fold'] = np.ones(len(history['loss']), dtype='int') * fold_i
history['epoch'] = np.arange(len(history['loss']))
self.history = pd.concat([self.history, pd.DataFrame(history)])
[docs] def predict(self, df, cv='val', retrieve_fold_info = False, **kwds):
"""
Calls predict() on the Keras model. The argument cv specifies the
cross validation set to select: 'train', 'val', 'test'.
Default is 'val'.
All other keywords are passed to predict.
"""
if cv not in ['train', 'val', 'test']:
raise ValueError("Argument 'cv' must be one of 'train', 'val', "
"'test', 'all'; but was %s." % repr(cv))
out = np.zeros((len(df), len(self.output_list)))
test_set = np.zeros(len(df), dtype='bool')
for fold_i in range(self.cv.k):
model = self.models[fold_i]
norm = self.norms[fold_i]
# identify fold
selected = self.cv.select_cv_set(df, cv, fold_i)
test_set |= selected
out[selected] = model.predict(norm(df[selected][self.input_list]),
**kwds)
test_df = df[test_set]
out = out[test_set].transpose()
out = dict(zip(["pred_" + s for s in self.output_list], out))
test_df = test_df.assign(**out)
if retrieve_fold_info:
fold = {cv + "_fold" : self.cv.retrieve_fold_info(df, cv)}
test_df = test_df.assign(**fold)
return test_df
[docs] def save(self, path):
"""
Save the model and all associated components to a hdf5 file.
"""
# save model architecture and weights (only if already trained)
if len(self.models) == self.cv.k:
for fold_i in range(self.cv.k):
path_token = path.rsplit(".", 1)
if len(path_token) == 1:
path_token.append(f"fold_{fold_i}")
else:
path_token.insert(-1, f"fold_{fold_i}")
# this is the built-in save function from keras
self.models[fold_i].save(".".join(path_token))
with h5py.File(path, "w") as output_file:
# save default model class
# since this is a arbitrary piece of python code we need to use the python_to_str function
group = output_file.create_group("models/default")
group.attrs["model_cls"] = np.string_(python_to_str(self.model_cls))
# save class name of default normalizer as string
group = output_file.create_group("normalizers/default")
group.attrs["norm_cls"] = np.string_(self.norm_cls.__name__)
# save cross_validator
self.cv.save_to_h5(path, "cross_validator")
# save normalizer (only if already trained)
if len(self.norms) == self.cv.k:
for fold_i in range(self.cv.k):
self.norms[fold_i].save_to_h5(path, "normalizers/fold_{}".format(fold_i))
# save input/output lists
pd.DataFrame(self.input_list).to_hdf(path, "input_list")
pd.DataFrame(self.output_list).to_hdf(path, "output_list")
# save training history
self.history.to_hdf(path, "history")
[docs] @classmethod
def load(cls, path):
"""
Restore a model from a hdf5 file.
"""
# load default model and normalizer
with h5py.File(path, "r") as input_file:
model = str_to_python(input_file["models/default"].attrs["model_cls"].decode())
normalizer_class_name = input_file["normalizers/default"].attrs["norm_cls"].decode()
normalizer = getattr(sys.modules[__name__], normalizer_class_name)
# load cross validator
cv = CrossValidator.load_from_h5(path, "cross_validator")
# load input/output lists
input_list = list(pd.read_hdf(path, "input_list")[0])
output_list = list(pd.read_hdf(path, "output_list")[0])
# create instance
instance = cls(model, cv, normalizer, input_list, output_list)
# load history
history = pd.read_hdf(path, "history")
instance.history = history
# load trained models (if existing)
with h5py.File(path, "r") as input_file:
for fold_i in range(cv.k):
path_token = path.rsplit(".", 1)
if len(path_token) == 1:
path_token.append(f"fold_{fold_i}")
else:
path_token.insert(-1, f"fold_{fold_i}")
model = keras.models.load_model(".".join(path_token))
instance.models.append(model)
# load normalizer
for fold_i in range(cv.k):
norm = Normalizer.load_from_h5(path, "normalizers/fold_{}".format(fold_i))
if norm is not None:
instance.norms.append(norm)
return instance
[docs] def export(self, path_base, command="converters/keras2json.py",
expression={}):
"""
Exports the network such that it can be converted to lwtnn's json
format. The method generate a set of files for each cross validation
fold. For every fold, the archtecture, the weights, the input
variables and their normalization is exported. To simplify the
conversion to lwtnn's json format, the method also creates a bash
script which converts all folds.
The path_base argument should be a path or a name of the network. The
names of the generated files are created by appending to path_base.
The optional expression can be used to inject the CAF expression when
the NN is used. The final json file will contain an entry KEY=VALUE if
a variable matches the dict key.
"""
for fold_i in range(self.cv.k):
# get the architecture as a json string
arch = self.models[fold_i].to_json()
# save the architecture string to a file somehow, the below will work
with open('%s_arch_%d.json' % (path_base, fold_i), 'w') as arch_file:
arch_file.write(arch)
# now save the weights as an HDF5 file
self.models[fold_i].save_weights('%s_wght_%d.h5' % (path_base, fold_i))
with open("%s_vars_%d.json" % (path_base, fold_i), "w") \
as variable_file:
scales = self.norms[fold_i].scales
offsets = self.norms[fold_i].offsets
offsets = [o / s for o, s in zip(offsets, scales)]
variables = [("%s=%s" % (v, expression[v]))
if v in expression else v
for v in self.input_list]
inputs = [dict(name=v, offset=o, scale=s)
for v, o, s in zip(variables, offsets, scales)]
json.dump(dict(inputs=inputs, class_labels=self.output_list),
variable_file)
mode = "w" if fold_i == 0 else "a"
with open("%s.sh" % path_base, mode) as script_file:
print(f"{command} {path_base}_arch_{fold_i}.json "
f"{path_base}_vars_{fold_i}.json "
f"{path_base}_wght_{fold_i}.h5 "
f"> {path_base}_{fold_i}.json", file=script_file)