import math
import re
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import AutoMinorLocator, LogLocator
from matplotlib.transforms import ScaledTranslation
import pandas as pd
import dask
import dask.dataframe as dd
import seaborn as sns
from atlasify import atlasify
import uhepp
from nnfwtbn.stack import Stack
from nnfwtbn.process import Process
from nnfwtbn.cut import Cut
from nnfwtbn.variable import Variable, BlindingStrategy
import nnfwtbn.error as err
# Store built-in range
range_ = range
ATLAS = "Internal"
INFO = "$\sqrt{s} = 13\,\mathrm{TeV}$, $139\,\mathrm{fb}^{-1}$\nnnfwtbn"
[docs]def fill_labels(label, info):
if label is None:
label = ATLAS
if info is None:
info = INFO
return label, info
[docs]def human_readable(label):
"""Convert labels to plain ascii strings"""
pttrn = "[^a-zA-Z0-9_-]+"
label = re.sub(f"(^{pttrn}|{pttrn}$)", "", label)
label = re.sub(pttrn, "_", label)
return label
[docs]class HistogramFactory:
"""
Short-cut to create multiple histogram with the same set of processes or
in the same region.
"""
[docs] def __init__(self, *args, **kwds):
"""
Accepts any number of positional and keyword arguments. The arguments
are stored internally and use default value for hist(). See __call__().
"""
self.args = args
self.kwds = kwds
[docs] def __call__(self, *args, **kwds):
"""
Proxy for method to hist(). The positional argument passed to hist()
are the positional argument given to the constructor concatenated with
the positional argument given to this method. The keyword argument for
hist() is the union of the keyword arguments passed to the constructor
and this method. The argument passed to this method have precedence.
The method returns the return value of hist.
"""
return hist(*self.args, *args, **dict(self.kwds, **kwds))
[docs]def hist(dataframe, variable, bins, stacks, selection=None,
range=None, blind=None, figure_size=None,
weight=None, y_log=False, y_min=None, vlines=[],
denominator=0, numerator=-1, ratio_label=None, diff=False,
ratio_range=None, atlas=None, info=None, enlarge=1.6,
density=False, include_outside=False, return_uhepp=False, **kwds):
"""
Creates a histogram of stacked processes. The first argument is the
dataframe to operate on. The 'variable' argument defines the x-axis. The
variable argument can be a Variable object or a string naming a column in
the dataframe.
The 'bins' argument can be an integer specifying the number of bins or a
list with all bin boundaries. If it is an integer, the argument range is
mandatory. The range argument must be a tuple with the lowest and highest
bin edge. The properties of a Variable object are used for the x- and
y-axis labels.
Stacks must be Stack objects. The plotting style is defined via the stack
object.
The optional blind argument controls which stack should be blinded. The
argument can be a single stack, a list of stacks or None. By default,
no stack is blinded.
This method creates a new figure and axes internally (handled by uhepp).
The figure size can be changed with the figure_size argument. If this
argument is not None, it must be a tuple of (width, height).
The method returns (figure, axes) which were used during plotting. This
might be identical to the figure and axes arguments. If a ratio plot is
drawn, the axes return value is a list of main, ratio plot.
The weight is used to weight the entries. Entries have unit
weight if omitted. The argument can be a string name of a column or a
variable object.
If the y_log argument is set to True, the y axis will be logarithmic. The
axis is enlarged on a logarithmic scale to make room for the ATLAS labels.
The optional y_min argument can be used to set the lower limit of the y
axis. The default is 0 for linear scale, and 1 for logarithmic scale.
The option vlines can be used to draw vertical lines onto the histogram,
e.g., a cut line. The argument should be an array, one item for each line.
If the item is a number a red line will be drawn at that x-position. If it
is a dict, it will take the item 'x' to determine the position, all other
keywords are passed to matplotlibs plot method.`
The ratio_label option controls the label of the ratio plot.
The ratio_range argument control the y-range of the ratio plot. If set to
None, it will scale automatically to include all points. The default is
is None.
If diff is set to True, The difference between the 'numerator' and the
'denominator' is down instead of their ratio.
The module constants ATLAS and INFO are passed to atlasify. Overwrite them
to change the badges.
If the density argument is True, the area of each stack is normalized to
unity.
If return_uhepp is True, the method return a UHepPlot object.
"""
##################
# Wrap column string by variable
if isinstance(variable, str):
variable = Variable(variable, variable)
##################
# Handle range/bins
if range is not None:
# Build bins
if not isinstance(bins, int):
raise err.InvalidBins("When range is given, bins must be int.")
if not isinstance(range, tuple) or len(range) != 2:
raise err.InvalidProcessSelection("Range argument must be a "
"tuple of two numbers.")
bins = np.linspace(range[0], range[1], bins + 1)
bin_edges = [float(x) for x in bins]
##################
# Create UHepp object
uhepp_obj = uhepp.UHeppHist(variable.name, bin_edges)
uhepp_obj.producer = "nnfwtbn"
##################
# Unit
if variable.unit is not None:
uhepp_obj.unit = variable.unit
##################
# Branding
atlas, info = fill_labels(atlas, info)
if atlas is not False:
uhepp_obj.brand = "ATLAS"
uhepp_obj.brand_label = atlas
if info is not False:
uhepp_obj.subtext = info
##################
# Y-Axis
uhepp_obj.y_log = y_log
##################
# Weight
if weight is None:
weight = Variable("unity", lambda d: variable(d) * 0 + 1)
elif isinstance(weight, str):
weight = Variable(weight, weight)
squared_weight = Variable("squared weight", lambda d: weight(d)**2)
##################
# Ratio
draw_ratio = (denominator is not None) and (numerator is not None)
# Disable ratio plots if there is only one stack
if len(stacks) == 1 and \
isinstance(denominator, int) and \
isinstance(numerator, int):
draw_ratio = False
##################
# Handle selection
if selection is None:
selection = Cut(lambda d: variable(d) * 0 == 0)
elif not isinstance(selection, Cut):
selection = Cut(selection)
##################
# Create separate under- and overflow bin
uhepp_obj.include_overflow = include_outside
uhepp_obj.include_underflow = include_outside
def pad(edges):
padded = ['-inf'] + list(edges) + ['inf']
padded = [float(x) for x in padded]
return np.asarray(padded)
bins = pad(bins)
##################
# Blinding
if blind is None:
blind = []
elif isinstance(blind, Stack):
# Wrap scalar blind stack
blind = [blind]
##################
# Handle stack
yields = {} # temporary, potentially delayed
for i_stack, stack in enumerate(stacks):
stack_items = []
if stack in blind and variable.blinding is not None:
c_blind = variable.blinding(variable, bins[1:-1])
else:
c_blind = lambda d: d
density_norm = 1.0
if density:
density_norm = stack.get_total(dataframe,
[float('-inf'), float('inf')],
variable, weight, False).sum()
for i_process, process in enumerate(stack.processes):
histogram = stack.get_hist(c_blind(dataframe), i_process, bins,
variable, weight, False)
histogram = histogram / density_norm
uncertainty = stack.get_uncertainty(c_blind(dataframe), i_process,
bins, variable,
weight, False)
uncertainty = uncertainty / density_norm
human_readable_label = human_readable(process.label)
process_id = f"{human_readable_label}__s{i_stack}_p{i_process}"
yields[process_id] = (histogram, uncertainty)
style = stack.get_aux(i_process)
uhepp_sitem = uhepp.StackItem([process_id], process.label, **style)
stack_items.append(uhepp_sitem)
bartype = stack.get_histtype(0)
uhepp_stack = uhepp.Stack(stack_items, bartype)
uhepp_obj.stacks.append(uhepp_stack)
# Resolve numerator/denominator indices
if isinstance(numerator, int) and stacks[numerator] == stack:
numerator = stack
if isinstance(denominator, int) and stacks[denominator] == stack:
denominator = stack
if density:
uhepp_obj.y_label = "Events (a.u.)"
uhepp_obj.ratio_label = ratio_label
if ratio_range:
uhepp_obj.ratio_min = ratio_range[0]
uhepp_obj.ratio_max = ratio_range[1]
if draw_ratio:
ratio = []
# Get denominator hist
if denominator in blind and variable.blinding is not None:
c_blind = variable.blinding(variable, bins[1:-1])
else:
c_blind = lambda d: d
base = denominator.get_total(c_blind(dataframe), bins,
variable, weight, False)
stat = denominator.get_total_uncertainty(c_blind(dataframe),
bins, variable,
weight, False)
yields["den"] = (base, stat)
# Process numerators
numerators = numerator
if not isinstance(numerators, (list, tuple)):
numerators = [numerators]
numerators_data = []
for i_numerator, numerator in enumerate(numerators):
if numerator in blind and variable.blinding is not None:
c_blind = variable.blinding(variable, bins[1:-1])
else:
c_blind = lambda d: d
process_id = f"num_{i_numerator}"
base = numerator.get_total(c_blind(dataframe), bins,
variable, weight, False)
stat = numerator.get_total_uncertainty(c_blind(dataframe),
bins, variable,
weight, False)
yields[process_id] = (base, stat)
histtype = numerator.get_histtype(0)
bartype = "points" if histtype == "points" else "step"
style = {} #'markersize': 4, 'fmt': 'o'}
style.update(numerator.get_aux(0))
uhepp_ritem = uhepp.RatioItem([process_id], ["den"], bartype, **style)
uhepp_obj.ratio.append(uhepp_ritem)
##################
# Compute delayed dask
yields, = dask.compute(yields)
for key, item in yields.items():
uhepp_obj.yields[key] = uhepp.Yield(*item)
##################
# Reorder if y-axis is log
def total_stackitem(uhepp_obj, stack_item):
"""Compute the total yield of a stack item"""
process_totals = [sum(uhepp_obj.get_base(yield_name))
for yield_name in stack_item.yield_names]
return sum(process_totals)
if y_log:
for stack in uhepp_obj.stacks:
stack.content.sort(key=lambda x: total_stackitem(uhepp_obj, x))
##################
# Vertical lines
for vline in vlines:
if isinstance(vline, (int, float)):
uhepp_obj.v_lines.append(uhepp.VLine(vline))
else:
pos_x = vline.pop("x")
uhepp_obj.v_lines.append(uhepp.VLine(pos_x, **vline))
if figure_size is not None:
uhepp_obj.figure_size = figure_size
if return_uhepp:
return uhepp_obj
else:
return uhepp_obj.render()
def _transpose(array):
"""Return a transposed version of the array"""
if array == [] or array == [[]]:
return [[]]
return list(map(list, zip(*array)))
def _dask_compute(array):
"""Call dask.compute() on flattened list and return the structured result"""
if not array or not array[0]:
return array
x_len = len(array)
y_len = len(array[0])
flat = [i for s in array for i in s]
flat = dask.compute(*flat)
return [[flat[x * y_len + y] for y in range(y_len)] for x in range(x_len)]
[docs]def confusion_matrix(df, x_processes, y_processes, x_label, y_label,
weight=None, axes=None, figure=None, atlas=ATLAS,
info=None, enlarge=1.3, normalize_rows=False, **kwds):
"""
Creates a confusion matrix.
"""
# Wrap column string by variable
if weight is None:
weight = Variable("unity", lambda d: np.ones(len(d)))
elif isinstance(weight, str):
weight = Variable(weight, weight)
# Handle axes, figure
if figure is None:
figure, axes = plt.subplots() #figsize=(5,5))
elif axes is None:
axes = figure.subplots()
y_processes = y_processes[::-1]
if normalize_rows:
# Swap processes
y_processes, x_processes = x_processes, y_processes
# Need a nested-list (not numpy array) to store Dask scalars
data = [[0 for p in x_processes] for q in y_processes]
for i_x, x_process in enumerate(x_processes):
x_df = x_process(df)
total_weight = weight(x_df).sum()
for i_y, y_process in enumerate(y_processes):
x_y_df = y_process(x_df)
data[i_y][i_x] = weight(x_y_df).sum() / total_weight
if normalize_rows:
# Swap processes back
y_processes, x_processes = x_processes, y_processes
data = _transpose(data)
data = _dask_compute(data)
data = pd.DataFrame(data,
columns=[p.label for p in x_processes],
index=[p.label for p in y_processes])
if normalize_rows:
# Swap labels
y_label, x_label = x_label, y_label
sns.heatmap(data, **dict(vmin=0, vmax=1, cmap="Greens", ax=axes,
cbar_kws={
'label': "$P($%s$|$%s$)$" % (y_label, x_label)
}
), **kwds)
if normalize_rows:
# Swap labels back
y_label, x_label = x_label, y_label
axes.set_xlabel(x_label)
axes.set_ylabel(y_label)
axes.set_ylim(len(y_processes), 0)
if (atlas is not False) or (info is not False):
atlasify(*fill_labels(atlas, info), axes=axes, outside=True)
figure.subplots_adjust(top=1/enlarge)
return data
[docs]def roc(df, signal_process, background_process, discriminant, steps=100,
selection=None, min=None, max=None, axes=None, weight=None,
atlas=ATLAS, info=None, enlarge=1.3,
return_auc=False):
"""
Creates a ROC.
The method returns a dataframe with the signal efficiency and background
rejection columns. The length of the dataframe equals the steps parameter.
If return_auc is True, the method returns a tuple with the area under the
curve and an uncertainty estimation on the area.
"""
# Wrap column string by variable
if discriminant is None:
discriminant = Variable("unity", lambda d: np.ones(len(d)))
elif isinstance(discriminant, str):
discriminant = Variable(discriminant, discriminant)
if weight is None:
weight = Variable("unity", lambda d: np.ones(len(d)))
elif isinstance(weight, str):
weight = Variable(weight, weight)
# Handle selection
if selection is None:
selection = lambda df: df # Identity
elif not isinstance(selection, Cut):
selection = Cut(selection)
# Handle axes
if axes is None:
fig, axes = plt.subplots()
if min is None:
min = discriminant(df).min()
if max is None:
max = discriminant(df).max()
df = selection(df)
signal_effs = []
background_ieffs = []
n_total_sig = weight(signal_process.selection(df)).sum()
n_total_bkg = weight(background_process.selection(df)).sum()
for cut_value in np.linspace(min, max, steps):
residual_df = df[discriminant(df) >= cut_value]
n_total = weight(residual_df).sum()
if n_total == 0:
continue
signal_df = signal_process.selection(residual_df)
background_df = background_process.selection(residual_df)
n_signal = weight(signal_df).sum()
n_background = weight(background_df).sum()
signal_effs.append(n_signal / n_total_sig)
background_ieffs.append(1 - n_background / n_total_bkg)
data = pd.DataFrame({"Signal efficiency": signal_effs,
"1 - Background efficiency": background_ieffs})
sns.lineplot(x="Signal efficiency", y="1 - Background efficiency", data=data,
ax=axes, ci=None, label=discriminant.name)
axes.plot([0, 1], [1, 0], color='gray', linestyle=':')
axes.set_xlim((0, 1))
axes.set_ylim((0, enlarge))
axes.legend(loc=1, frameon=False)
axes.tick_params("both", which="both", direction="in")
axes.tick_params("both", which="major", length=6)
axes.tick_params("both", which="minor", length=3)
axes.tick_params("x", which="both", top=True)
axes.tick_params("y", which="both", right=True)
axes.xaxis.set_minor_locator(AutoMinorLocator())
axes.yaxis.set_minor_locator(AutoMinorLocator())
if (atlas is not False) or (info is not False):
atlasify(*fill_labels(atlas, info), enlarge=1.0, axes=axes)
if return_auc:
# calculate rectangular approximations of AUC
# one of these will be too high, the other too low
# the difference gives a measure of uncertainty
auc_approx_a = 0
auc_approx_b = 0
for i_bin in range(1,len(signal_effs)):
bin_width = abs(signal_effs[i_bin] - signal_effs[i_bin-1])
auc_approx_a += bin_width * background_ieffs[i_bin]
auc_approx_b += bin_width * background_ieffs[i_bin - 1]
auc_mean = (auc_approx_a + auc_approx_b)/2
auc_diff = abs(auc_approx_a - auc_approx_b)/2
auc_mean = auc_mean if auc_mean > 0.5 else 1 - auc_mean
return (auc_mean, auc_diff)
else:
return data
[docs]def correlation_matrix(df, variables,
weight=None, axes=None, figure=None, atlas=ATLAS,
info=None, enlarge=1.3, normalize_rows=False, **kwds):
"""
Plot the Pearson correlation coefficient matrix. The square matrix is
returned as a DataFrame.
"""
if weight is None:
weight = Variable("unity", lambda d: np.ones(len(d)))
elif isinstance(weight, str):
weight = Variable(weight, weight)
kwds.setdefault("annot", True)
kwds.setdefault("fmt", "2.0f")
# Handle axes, figure
if figure is None:
figure, axes = plt.subplots() #figsize=(5,5))
elif axes is None:
axes = figure.subplots()
var_data = [v(df) for v in variables]
var_data = dask.compute(*var_data)
data = np.cov(np.array([v.to_numpy() for v in var_data]),
aweights=weight(df))
var = np.diag(data)
data = data / np.sqrt(var)
data = data.T / np.sqrt(var)
data = pd.DataFrame(data,
columns=[v.name for v in variables],
index=[v.name for v in variables])
sns.heatmap(data * 100, **dict(vmin=-100, vmax=100, cmap="RdBu_r", ax=axes,
cbar_kws={
'label': "Correlation coefficient $\\rho \\cdot 100$"
}
), **kwds)
axes.set_ylim(len(variables), 0)
if (atlas is not False) or (info is not False):
atlasify(*fill_labels(atlas, info), axes=axes, outside=True)
figure.subplots_adjust(top=1/enlarge)
return data