Source code for nnfwtbn.tests.test_plot


import unittest

import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
import matplotlib.pyplot as plt
import seaborn as sns

from nnfwtbn.variable import Variable
from nnfwtbn.cut import Cut
from nnfwtbn.stack import McStack, DataStack
from nnfwtbn.process import Process
from nnfwtbn.plot import roc, confusion_matrix, atlasify, hist, \
                         HistogramFactory, fill_labels, ATLAS, INFO, \
                         _transpose, _dask_compute, human_readable


[docs]class SurvivalTestCase(unittest.TestCase): """ Test that calling the plotting methods does not cause a crash. (Testing plotting methods is notoriously difficult.) """
[docs] def test_roc(self): """ Check that calling roc() does not crash. """ df = pd.DataFrame({"fpid": [0]*4 + [1]*4, "reco_signal": [0, 1]*4}) p_signal = Process("Signal", range=(0, 0)) p_bkg = Process("Background", range=(1, 1)) try: roc(df, p_signal, p_bkg, 'reco_signal') except Exception: self.fail("Calling roc() failed.")
[docs] def test_roc_return(self): """ Check that the return valueis not None """ df = pd.DataFrame({"fpid": [0]*4 + [1]*4, "reco_signal": [0, 1]*4}) p_signal = Process("Signal", range=(0, 0)) p_bkg = Process("Background", range=(1, 1)) return_value = roc(df, p_signal, p_bkg, 'reco_signal') self.assertIsNotNone(return_value)
[docs] def test_roc_custom_selection(self): """ Check that calling roc() with a custom selection does not crash. """ df = pd.DataFrame({"fpid": [0]*4 + [1]*4, "reco_signal": [0, 1]*4, "is_sr": [0] * 2 + [1]*4 + [0]*2}) p_signal = Process("Signal", range=(0, 0)) p_bkg = Process("Background", range=(1, 1)) c_sr = Cut(lambda d: d.is_sr > 0) try: roc(df, p_signal, p_bkg, 'reco_signal', selection=c_sr) except Exception: self.fail("Calling roc() failed.")
[docs] def test_roc_custom_axis(self): """ Check that calling roc() with an existing axis does not crash. """ df = pd.DataFrame({"fpid": [0]*4 + [1]*4, "reco_signal": [0, 1]*4}) p_signal = Process("Signal", range=(0, 0)) p_bkg = Process("Background", range=(1, 1)) fig, ax = plt.subplots() try: roc(df, p_signal, p_bkg, 'reco_signal', axes=ax) except Exception: self.fail("Calling roc() failed.")
[docs] def test_confusion_matrix(self): """ Check that calling confusion_matrix() does not raise an exception. """ df = pd.DataFrame({ "sick": [0, 1, 1, 0, 0], "positive": [0, 0, 1, 1, 0], "weight": [1, 1, 1, 1.7, 1], }) positive = Cut(lambda d: d.positive == 1, label="+") negative = Cut(lambda d: d.positive == 0, label="-") sick = Cut(lambda d: d.sick == 1, label="sick") healthy = Cut(lambda d: d.sick == 0, label="healthy") try: data = confusion_matrix(df, [positive, negative], [sick, healthy], "Test result", "Truth", weight=Variable("weight", "weight"), annot=True) except Exception: self.fail("Calling confusion_matrix() failed.")
[docs] def test_confusion_matrix_return(self): """ Check that the return value is not None. """ df = pd.DataFrame({ "sick": [0, 1, 1, 0, 0], "positive": [0, 0, 1, 1, 0], "weight": [1, 1, 1, 1.7, 1], }) positive = Cut(lambda d: d.positive == 1, label="+") negative = Cut(lambda d: d.positive == 0, label="-") sick = Cut(lambda d: d.sick == 1, label="sick") healthy = Cut(lambda d: d.sick == 0, label="healthy") data = confusion_matrix(df, [positive, negative], [sick, healthy], "Test result", "Truth", weight=Variable("weight", "weight"), annot=True) self.assertIsNotNone(data)
[docs] def test_confusion_matrix_argument_reverse(self): """ Check that confusion_matrix does not change the arguments. """ df = pd.DataFrame({ "sick": [0, 1, 1, 0, 0], "positive": [0, 0, 1, 1, 0], "weight": [1, 1, 1, 1.7, 1], }) positive = Cut(lambda d: d.positive == 1, label="+") negative = Cut(lambda d: d.positive == 0, label="-") sick = Cut(lambda d: d.sick == 1, label="sick") healthy = Cut(lambda d: d.sick == 0, label="healthy") x = [positive, negative] y = [sick, healthy] confusion_matrix(df, x, y, "Test result", "Truth", weight=Variable("weight", "weight")) self.assertEqual(x, [positive, negative]) self.assertEqual(y, [sick, healthy])
[docs] def test_atlasify(self): """ Check that calling atlasify() does not raise an error. """ df = pd.DataFrame({"fpid": [0]*4 + [1]*4, "reco_signal": [0, 1]*4}) p_signal = Process("Signal", range=(0, 0)) p_bkg = Process("Background", range=(1, 1)) roc(df, p_signal, p_bkg, 'reco_signal') try: atlasify() except Exception: self.fail("Calling atlasify() failed.")
[docs] def test_hist(self): """ Check that calling hist() does not raise an exception. """ df = pd.DataFrame({"fpid": [0]*4 + [1]*4, "reco_signal": [0, 1]*4, "is_sr": [0] * 2 + [1]*4 + [0]*2, "weight": [1, 2, 3, 4]*2}) v_higgs = Variable(r"$H$", "reco_signal") c_vbf = Cut(lambda d: d.is_sr == 1) p_ggh = Process("$gg$F", range=(0, 0)) p_vbfh = Process("VBF", range=(1, 1)) s_all = McStack(p_ggh, p_vbfh) try: hist(df, v_higgs, selection=c_vbf, bins=20, stacks=[s_all], range=(50, 450), weight="weight") except Exception: self.fail("Calling hist() failed.")
[docs] def test_hist_return(self): """ Check that the return value is not None. """ df = pd.DataFrame({"fpid": [0]*4 + [1]*4, "reco_signal": [0, 1]*4, "is_sr": [0] * 2 + [1]*4 + [0]*2, "weight": [1, 2, 3, 4]*2}) v_higgs = Variable(r"$H$", "reco_signal") c_vbf = Cut(lambda d: d.is_sr == 1) p_ggh = Process("$gg$F", range=(0, 0)) p_vbfh = Process("VBF", range=(1, 1)) s_all = McStack(p_ggh, p_vbfh) fig, axes = hist(df, v_higgs, selection=c_vbf, bins=20, stacks=[s_all], range=(50, 450), weight="weight") self.assertIsNotNone(fig) self.assertIsNotNone(axes)
[docs] def test_hist_return_wo_ratio(self): """ Check that the return value is not None if there is no ratio plot. """ df = pd.DataFrame({"fpid": [0]*4 + [1]*4, "reco_signal": [0, 1]*4, "is_sr": [0] * 2 + [1]*4 + [0]*2, "weight": [1, 2, 3, 4]*2}) v_higgs = Variable(r"$H$", "reco_signal") c_vbf = Cut(lambda d: d.is_sr == 1) p_ggh = Process("$gg$F", range=(0, 0)) p_vbfh = Process("VBF", range=(1, 1)) s_all = McStack(p_ggh, p_vbfh) fig, axes = hist(df, v_higgs, selection=c_vbf, bins=20, stacks=[s_all], range=(50, 450), weight="weight", numerator=None) self.assertIsNotNone(fig) self.assertIsNotNone(axes)
[docs] def test_hist_facory(self): """ Check that calling a HistogramFactory does not raise an exception. """ df = pd.DataFrame({"fpid": [0]*4 + [1]*4, "reco_signal": [0, 1]*4, "is_sr": [0] * 2 + [1]*4 + [0]*2, "weight": [1, 2, 3, 4]*2}) v_higgs = Variable(r"$H$", "reco_signal") c_vbf = Cut(lambda d: d.is_sr == 1) p_ggh = Process("$gg$F", range=(0, 0)) p_vbfh = Process("VBF", range=(1, 1)) s_all = McStack(p_ggh, p_vbfh) hist_factory = HistogramFactory(df, selection=c_vbf, stacks=[s_all], weight="weight") try: hist_factory(v_higgs, bins=20, range=(50, 450)) except Exception: self.fail("Calling a HistogramFactory failed.")
[docs] def test_hist_wo_ratio(self): """ Check that creating a plot without ratio plot does not crash. """ df = pd.DataFrame({"fpid": [0]*4 + [1]*4, "reco_signal": [0, 1]*4, "is_sr": [0] * 2 + [1]*4 + [0]*2, "weight": [1, 2, 3, 4]*2}) v_higgs = Variable(r"$H$", "reco_signal") c_vbf = Cut(lambda d: d.is_sr == 1) p_ggh = Process("$gg$F", range=(0, 0)) p_vbfh = Process("VBF", range=(1, 1)) s_all = McStack(p_ggh, p_vbfh) try: hist(df, v_higgs, selection=c_vbf, bins=20, stacks=[s_all], range=(50, 450), weight="weight", numerator=None) except Exception: self.fail("Calling hist() w/o ratio plot failed.")
[docs]class PlotTestCases(unittest.TestCase): """ Test the implementation of the functions in the plot module. These are actual tests, not only survival tests. """
[docs] def test_fill_labels(self): """ Check that fill_labels() substitutes None with the module string. """ self.assertEqual(fill_labels(None, None), (ATLAS, INFO)) self.assertEqual(fill_labels(None, "Hello"), (ATLAS, "Hello")) self.assertEqual(fill_labels("WIP", None), ("WIP", INFO)) self.assertEqual(fill_labels("WIP", "World"), ("WIP", "World"))
[docs] def test_roc_area(self): """ Check that roc() returns the area and an uncertainty estimate for a toy example. """ N = 2000 lin = np.linspace(0, 1, N) df = pd.DataFrame(dict( x=np.concatenate([lin, lin]), fpid=[0]*N + [1]*N, weight=np.concatenate([1 - lin, lin]))) p_sig = Process("Signal", range=(1, 1)) p_bkg = Process("Bkg", range=(0, 0)) v_weight = Variable("weight", "weight") v_x = Variable("x", "x") area, uncertainty = roc(df, p_sig, p_bkg, v_x, weight=v_weight, return_auc=True) # ==================== # Analytic Computation # ==================== # # Initial distribution: # sig = x # bkg = 1 - x # i sig = x^2 / 2 # i bkg = x - x^2 / 2 # # y_sig(x_cut) = 1 - x^2 # y_bkg(x_cut) = 1 - 2 * x + x^2 # # ROC curve: # x_cut(y_sig) = sqrt(1 - y_sig) # y_bkg(y_sig) = 2 - 2 * sqrt(1 - y_sig) - y_sig # r_bkg(y_sig) = - 1 + 2 * sqrt(1 - y_sig) + y_sig # # Integration: # i r_bkg(y_sig) = -y_sig - 4/3 * (1 - y_sig)^(3/2) + y_sig^2 / 2 # i r_bkg(0) = 0 - 4/3 * 1 = -4/3 # i r_bkg(1) = -1 + 0.5 # i r_bkg[0..1] = -0.5 - (-4/3) = (8-3) / 6 = 5 / 6 # # Expected area: 5/6 self.assertLess(abs(area - 5/6), uncertainty) self.assertGreater(area, 0.5) self.assertLess(uncertainty, 0.01) self.assertGreater(uncertainty, 0)
[docs]class TransposeTesCase(unittest.TestCase): """Test the implementation of _transpose()"""
[docs] def test_rect(self): """Check that a rectangular array is transposed.""" array = [[1, 2, 3], [5, 6, 7]] self.assertEqual(_transpose(array), [[1, 5], [2, 6], [3, 7]])
[docs] def test_square(self): """Check that a square array is transposed.""" array = [[1, 2, 3], [5, 6, 7], ["a", "b", "c"]] self.assertEqual(_transpose(array), [[1, 5, "a"], [2, 6, "b"], [3, 7, "c"]])
[docs] def test_row(self): """Check that a row is turned into a column""" array = [[1], [2], [3]] self.assertEqual(_transpose(array), [[1, 2, 3]])
[docs] def tes_col(self): """Check that a column is turned into a row""" array = [[1, 2, 3]] self.assertEqual(_transpose(array), [[1], [2], [3]])
[docs] def test_single(self): """Check that a single-item-list is no changed.""" array = [[1]] self.assertEqual(_transpose(array), [[1]])
[docs] def test_empty(self): """Check that a empty list and a list of an empty list handled""" array = [[]] self.assertEqual(_transpose(array), [[]]) array = [] self.assertEqual(_transpose(array), [[]])
[docs] def test_non_modify(self): """Check that the original arrays are not modified""" array = [[1, 2, 3], [5, 6, 7]] _transpose(array) self.assertEqual(array,[[1, 2, 3], [5, 6, 7]])
[docs]class DaskComputeTestCase(unittest.TestCase): """Test the implementation of _dask_compute()"""
[docs] def test_dask(self): """Check that the return value is correct""" df = dd.from_pandas(pd.DataFrame({'x': [1, 2, 3], 'y': [11, 12, 13]}), npartitions=1) array = [[(df.x * 1).sum(), (df.x * 2).sum(), (df.x * 3).sum()], [(df.y * 1).sum(), (df.y * 2).sum(), (df.y * 3).sum()]] result = _dask_compute(array) self.assertEqual(result, [[6, 12, 18], [36, 72, 108]])
[docs] def test_pandas(self): """Check that the input can pandas""" df = pd.DataFrame({'x': [1, 2, 3], 'y': [11, 12, 13]}) array = [[(df.x * 1).sum(), (df.x * 2).sum(), (df.x * 3).sum()], [(df.y * 1).sum(), (df.y * 2).sum(), (df.y * 3).sum()]] result = _dask_compute(array) self.assertEqual(result, [[6, 12, 18], [36, 72, 108]])
[docs]class HelperMethodsTestCase(unittest.TestCase): """Check that helper methods are implemented correctly""" def test_human_readable(self): """Check inner chars are replaced by _""" label = human_readable("Hello$% world") self.assertEqual(label, "Hello_world")
[docs] def test_human_readable(self): """Check start or end chars are removed""" label = human_readable("'HelloWorld!'") self.assertEqual(label, "HelloWorld")