import unittest
import pandas as pd
import numpy as np
from nnfwtbn.process import Process
from nnfwtbn.variable import Variable
from nnfwtbn.stack import Stack, McStack, DataStack, not_none_or_default
[docs]class TestStack(unittest.TestCase):
"""
Test the implementation of the Stack class.
"""
[docs] def test_init_store_processes(self):
"""
Check that the constructor stores the processes passed to it.
"""
p_signal = Process("Signal", range=(0, 0))
p_bkg = Process("Background", range=(1, 1))
stack = Stack(p_signal, p_bkg)
# must be list to be extendable
self.assertEqual(stack.processes, [p_signal, p_bkg])
self.assertEqual(stack.histtypes, [None, None])
self.assertEqual(stack.data_uncertainties, [None, None])
[docs] def test_init_store_processes_empty(self):
"""
Check that passing no processes to the constructor works.
"""
stack = Stack()
# must be list to be extendable
self.assertEqual(stack.processes, [])
self.assertEqual(stack.histtypes, [])
self.assertEqual(stack.data_uncertainties, [])
[docs] def test_init_store_kwds(self):
"""
Check that the keyword arguments passed to the constructor are stored.
"""
stack = Stack(histtype='step', data_uncertainty=True, linestyle='-.')
self.assertEqual(stack.default_aux, {'linestyle': '-.'})
self.assertEqual(stack.default_histtype, 'step')
self.assertEqual(stack.default_data_uncertainty, True)
[docs] def test_init_defaults(self):
"""
Check that the default values fr histtype and data_uncertainty are not
None.
"""
stack = Stack()
self.assertIsNotNone(stack.default_histtype)
self.assertIsNotNone(stack.default_data_uncertainty)
[docs] def test_add_process_store_process(self):
"""
Check that add_process() stores the new process.
"""
p_Top = Process("Top", range=(2, 2))
stack = Stack()
stack.add_process(p_Top)
self.assertEqual(stack.processes, [p_Top])
self.assertEqual(stack.histtypes, [None])
self.assertEqual(stack.data_uncertainties, [None])
self.assertEqual(stack.aux, [{}])
[docs] def test_add_process_append_process(self):
"""
Check that adding a process appends the new process to the internal
lists.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, p_Ztt)
stack.add_process(p_Top)
self.assertEqual(stack.processes, [p_signal, p_Ztt, p_Top])
self.assertEqual(stack.histtypes, [None, None, None])
self.assertEqual(stack.data_uncertainties, [None, None, None])
self.assertEqual(stack.aux, [{}, {}, {}])
[docs] def test_add_process_store_kwds(self):
"""
Check that add_process() stores the custom kwds of the process.
"""
p_Top = Process("Top", range=(2, 2))
stack = Stack()
stack.add_process(p_Top, histtype='points', data_uncertainty=False,
linestyle=':')
self.assertEqual(stack.histtypes, ['points'])
self.assertEqual(stack.data_uncertainties, [False])
self.assertEqual(stack.aux, [{'linestyle': ':'}])
[docs] def test_add_process_append_kwds(self):
"""
Check that adding a process appends the custom kwds to the internal
lists.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, p_Ztt)
stack.add_process(p_Top, histtype='points', data_uncertainty=False,
linestyle=':')
self.assertEqual(stack.processes, [p_signal, p_Ztt, p_Top])
self.assertEqual(stack.histtypes, [None, None, 'points'])
self.assertEqual(stack.data_uncertainties, [None, None, False])
self.assertEqual(stack.aux, [{}, {}, {'linestyle': ':'}])
[docs] def test_is_data_uncertainty_default(self):
"""
Check that is_data_uncertainty() returns the default uncertainty value
when the process was added without a custom data_uncertainty value.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, data_uncertainty=True)
stack.add_process(p_Ztt)
stack.add_process(p_Top)
self.assertTrue(stack.is_data_uncertainty(1))
[docs] def test_is_data_uncertainty_custom(self):
"""
Check that is_data_uncertainty() returns the default uncertainty value
when the process was added with a custom data_uncertainty value.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, data_uncertainty=True)
stack.add_process(p_Ztt)
stack.add_process(p_Top, data_uncertainty=False)
self.assertFalse(stack.is_data_uncertainty(2))
[docs] def test_is_data_uncertainty_init_process(self):
"""
Check that is_data_uncertainty() returns the default uncertainty vlaue
when the process was passed to the constructor.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, data_uncertainty=True)
stack.add_process(p_Ztt)
stack.add_process(p_Top, data_uncertainty=False)
self.assertTrue(stack.is_data_uncertainty(0))
[docs] def test_get_histtype_default(self):
"""
Check that get_histtype() returns the default histtype when the
process was added without a custom histtype.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, histtype="step")
stack.add_process(p_Ztt)
stack.add_process(p_Top, histtype='points', data_uncertainty=False,
linestyle=':')
self.assertEqual(stack.get_histtype(1), "step")
[docs] def test_get_histtype_custom(self):
"""
Check that get_histtype() returns the default histtype when the
process was added with a custom histtype.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, histtype="step")
stack.add_process(p_Ztt)
stack.add_process(p_Top, histtype='points', data_uncertainty=False,
linestyle=':')
self.assertEqual(stack.get_histtype(2), "points")
[docs] def test_get_histtype_init_process(self):
"""
Check that get_histtype() returns the default histtype when the
process was passed to the constructor.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, histtype="step")
stack.add_process(p_Ztt)
stack.add_process(p_Top, histtype='points', data_uncertainty=False,
linestyle=':')
self.assertEqual(stack.get_histtype(0), "step")
[docs] def test_not_none_or_default_None(self):
"""
Check that not_none_or_default() returns the default if the value is
None.
"""
self.assertEqual(not_none_or_default(None, "Hello"), "Hello")
[docs] def test_not_none_or_default_False(self):
"""
Check that not_none_or_default() returns the value if the value is
False.
"""
self.assertEqual(not_none_or_default(False, "Hello"), False)
[docs] def test_not_none_or_default_NotNone(self):
"""
Check that not_none_or_default() returns the value if the value is
a non-empty string.
"""
self.assertEqual(not_none_or_default("Hi", "Hello"), "Hi")
[docs] def test_not_none_or_default_NoneDefault(self):
"""
Check that not_none_or_default() returns the value if the value is
a non-empty string.
"""
self.assertEqual(not_none_or_default("Hi", None), "Hi")
self.assertEqual(not_none_or_default(None, None), None)
[docs] def test_get_aux_default(self):
"""
Check that get_aux() returns the default aux values when the
process was added without a custom aux value.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, histtype="step", color='r', linestyle="-.")
stack.add_process(p_Ztt)
stack.add_process(p_Top, histtype='points', data_uncertainty=False,
linestyle=':', markersize=5)
self.assertEqual(stack.get_aux(1), {'color': 'r', 'linestyle': '-.'})
[docs] def test_get_aux_custom(self):
"""
Check that get_aux() returns the default aux values when the
process was added with custom aux values.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, histtype="step", color='r', linestyle="-.")
stack.add_process(p_Ztt)
stack.add_process(p_Top, histtype='points', data_uncertainty=False,
linestyle=':', markersize=5)
self.assertEqual(stack.get_aux(2), {'color': 'r', 'linestyle': ':',
'markersize': 5})
[docs] def test_get_aux_init_process(self):
"""
Check that get_aux() returns the default aux values when the
process was passed to the constructor.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, histtype="step", color='r', linestyle="-.")
stack.add_process(p_Ztt)
stack.add_process(p_Top, histtype='points', data_uncertainty=False,
linestyle=':', markersize=5)
self.assertEqual(stack.get_aux(0), {'color': 'r', 'linestyle': '-.'})
[docs] def test_get_aux_none_overwrite(self):
"""
Check that passing None as an aux keyword overwrites the default.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, histtype="step", color='r', linestyle="-.")
stack.add_process(p_Ztt)
stack.add_process(p_Top, histtype='points', data_uncertainty=False,
linestyle=None)
self.assertEqual(stack.get_aux(2), {'color': 'r', 'linestyle': None})
[docs] def test_get_aux_not_modify_outer(self):
"""
Check that calling get_aux() does not modify dicts from the
constructor, add_process() or the internal one.
"""
p_signal = Process("Signal", range=(0, 0))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
p_Top = Process("Top", range=(2, 2))
constr_dict = {'color': 'r', 'linestyle': '-.'}
stack = Stack(p_signal, histtype='step', **constr_dict)
stack.add_process(p_Ztt)
addprocess_dict = {'linestyle': None}
stack.add_process(p_Top, histtype='points', data_uncertainty=False, **addprocess_dict)
stack.get_aux(0)
stack.get_aux(1)
stack.get_aux(2)
self.assertEqual(constr_dict, {'color': 'r', 'linestyle': '-.'})
self.assertEqual(stack.default_aux, {'color': 'r', 'linestyle': '-.'})
self.assertEqual(addprocess_dict, {'linestyle': None})
self.assertEqual(stack.aux, [{}, {}, {'linestyle': None}])
[docs] def test_len_init(self):
"""
Check that len() returns the number of processes passed to the
constructor when add_process() is not called.
"""
p_signal = Process("Signal", range=(0, 0))
p_Top = Process("Top", range=(2, 2))
stack = Stack(p_signal, p_Top)
self.assertEqual(len(stack), 2)
[docs] def test_len_add(self):
"""
Check that len() returns the number of processes passed to the
constructor when add_process() is not called.
"""
p_signal = Process("Signal", range=(0, 0))
p_Top = Process("Top", range=(2, 2))
p_Ztt = Process(r"$Z\rightarrow\tau\tau$", range=(1, 1))
stack = Stack(p_signal, p_Top)
stack.add_process(p_Ztt)
self.assertEqual(len(stack), 3)
[docs] def test_total(self):
"""
Check that get_total() returns the correct bin entries.
"""
df = pd.DataFrame({
"x": [-0.99, -1.67, -0.49, -0.97, 1.14, 0.78, 0.43, 1.24, -1.02,
0.77, 0],
"weight": [1.1, 0.9, 1.2, 0.8, 1.3, 0.7, 1.4, 0.6, 1.5, 0.5, 42],
"fpid": [1]*10 + [0]})
bins = np.array([-2, -1, 0, 1, 2])
v_x = Variable("x", "x")
v_weight = Variable("weight", "weight")
p_all = Process("All", lambda d: d.fpid == 1)
stack = Stack(p_all)
histogram = stack.get_total(df, bins, v_x, v_weight)
self.assertAlmostEqual(histogram[0], 0.9 + 1.5)
self.assertAlmostEqual(histogram[1], 1.1 + 0.8 + 1.2)
self.assertAlmostEqual(histogram[2], 1.4 + 0.5 + 0.7)
self.assertAlmostEqual(histogram[3], 1.3 + 0.6)
[docs] def test_hist(self):
"""
Check that get_hist() returns the histogram for a single progress when
the i argument is used.
"""
df = pd.DataFrame({
"x": [-0.99, -1.67, -0.49, -0.97, 1.14, 0.78, 0.43, 1.24, -1.02,
0.77, -0.99, -1.67, -0.49, -0.97, 1.14, 0.78, 0.43, 1.24,
-1.02, 0.77, 0],
"weight": [1.1, 0.9, 1.2, 0.8, 1.3, 0.7, 1.4, 0.6, 1.5, 0.5,
1.6, 0.4, 1.7, 0.3, 1.8, 0.2, 1.9, 0.1, 2, 1.0, 42],
"fpid": [0]*10 + [1]*10 + [-1]})
bins = np.array([-2, -1, 0, 1, 2])
v_x = Variable("x", "x")
v_weight = Variable("weight", "weight")
p_first = Process("First", lambda d: d.fpid == 0)
p_second = Process("Second", lambda d: d.fpid == 1)
p_all = Process("Second", lambda d: d.fpid >= 0)
stack = Stack(p_first, p_second, p_all)
h_first = stack.get_hist(df, 0, bins, v_x, v_weight)
self.assertAlmostEqual(h_first[0], 0.9 + 1.5)
self.assertAlmostEqual(h_first[1], 1.1 + 0.8 + 1.2)
self.assertAlmostEqual(h_first[2], 1.4 + 0.5 + 0.7)
self.assertAlmostEqual(h_first[3], 1.3 + 0.6)
h_second = stack.get_hist(df, 1, bins, v_x, v_weight)
self.assertAlmostEqual(h_second[0], 0.4 + 2.0)
self.assertAlmostEqual(h_second[1], 1.6 + 0.3 + 1.7)
self.assertAlmostEqual(h_second[2], 1.9 + 1.0 + 0.2)
self.assertAlmostEqual(h_second[3], 1.8 + 0.1)
h_all = stack.get_hist(df, 2, bins, v_x, v_weight)
self.assertTrue((abs(h_all - h_first - h_second) < 1e-12).all())
[docs] def test_uncertainty_only_data(self):
"""
Check that get_total_uncertainty() returns sqrt(N) of when all processes are
data.
"""
df = pd.DataFrame({
"x": [-0.99, -1.67, -0.49, -0.97, 1.14, 0.78, 0.43, 1.24, -1.02,
0.77, -0.99, -1.67, -0.49, -0.97, 1.14, 0.78, 0.43, 1.24,
-1.02, 0.77, 0],
"weight": [1.1, 0.9, 1.2, 0.8, 1.3, 0.7, 1.4, 0.6, 1.5, 0.5,
1.6, 0.4, 1.7, 0.3, 1.8, 0.2, 1.9, 0.1, 2, 1.0, 42],
"fpid": [0]*10 + [1]*10 + [-1]})
bins = np.array([-2, -1, 0, 1, 2])
v_x = Variable("x", "x")
v_weight = Variable("weight", "weight")
p_first = Process("First", lambda d: d.fpid == 0)
p_second = Process("Second", lambda d: d.fpid == 1)
stack = Stack(p_first, p_second, data_uncertainty=True)
uncertainty = stack.get_total_uncertainty(df, bins, v_x, v_weight)
self.assertAlmostEqual(uncertainty[0],
np.sqrt(0.9 + 1.5 + 0.4 + 2.0))
self.assertAlmostEqual(uncertainty[1],
np.sqrt(1.1 + 0.8 + 1.2 + 1.6 + 0.3 + 1.7))
self.assertAlmostEqual(uncertainty[2],
np.sqrt(1.4 + 0.5 + 0.7 + 1.9 + 1.0 + 0.2))
self.assertAlmostEqual(uncertainty[3],
np.sqrt(1.3 + 0.6 + 1.8 + 0.1))
[docs] def test_uncertainty_only_mc(self):
"""
Check that get_total_uncertainty() returns sqrt(sum w2) when all processes are
mc.
"""
df = pd.DataFrame({
"x": [-0.99, -1.67, -0.49, -0.97, 1.14, 0.78, 0.43, 1.24, -1.02,
0.77, -0.99, -1.67, -0.49, -0.97, 1.14, 0.78, 0.43, 1.24,
-1.02, 0.77, 0],
"weight": [1.1, 0.9, 1.2, 0.8, 1.3, 0.7, 1.4, 0.6, 1.5, 0.5,
1.6, 0.4, 1.7, 0.3, 1.8, 0.2, 1.9, 0.1, 2, 1.0, 42],
"fpid": [0]*10 + [1]*10 + [-1]})
bins = np.array([-2, -1, 0, 1, 2])
v_x = Variable("x", "x")
v_weight = Variable("weight", "weight")
p_first = Process("First", lambda d: d.fpid == 0)
p_second = Process("Second", lambda d: d.fpid == 1)
stack = Stack(p_first, p_second, data_uncertainty=False)
uncertainty = stack.get_total_uncertainty(df, bins, v_x, v_weight)
self.assertAlmostEqual(uncertainty[0],
np.sqrt((np.array([0.9, 1.5, 0.4, 2.0])**2).sum()))
self.assertAlmostEqual(uncertainty[1],
np.sqrt((np.array([1.1, 0.8, 1.2, 1.6, 0.3, 1.7])**2).sum()))
self.assertAlmostEqual(uncertainty[2],
np.sqrt((np.array([1.4, 0.5, 0.7, 1.9, 1.0, 0.2])**2).sum()))
self.assertAlmostEqual(uncertainty[3],
np.sqrt((np.array([1.3, 0.6, 1.8, 0.1])**2).sum()))
[docs] def test_mc_stack(self):
"""
Check that mc_stack and generic stack agree.
mc.
"""
df = pd.DataFrame({
"x": [-0.99, -1.67, -0.49, -0.97, 1.14, 0.78, 0.43, 1.24, -1.02,
0.77, -0.99, -1.67, -0.49, -0.97, 1.14, 0.78, 0.43, 1.24,
-1.02, 0.77, 0],
"weight": [1.1, 0.9, 1.2, 0.8, 1.3, 0.7, 1.4, 0.6, 1.5, 0.5,
1.6, 0.4, 1.7, 0.3, 1.8, 0.2, 1.9, 0.1, 2, 1.0, 42],
"fpid": [0]*10 + [1]*10 + [-1]})
bins = np.array([-2, -1, 0, 1, 2])
v_x = Variable("x", "x")
v_weight = Variable("weight", "weight")
p_first = Process("First", lambda d: d.fpid == 0)
p_second = Process("Second", lambda d: d.fpid == 1)
stack = Stack(p_first, p_second, data_uncertainty=False)
mc_stack = McStack(p_first, p_second)
self.assertTrue((abs(
stack.get_total_uncertainty(df, bins, v_x, v_weight)
- mc_stack.get_total_uncertainty(df, bins, v_x, v_weight))
< 1e-12).all())
[docs] def test_mc_stack_raises(self):
"""
Check that mc_stack raises an exception if the data_uncertainty
argument is used.
"""
bins = np.array([-2, -1, 0, 1, 2])
v_x = Variable("x", "x")
v_weight = Variable("weight", "weight")
p_first = Process("First", lambda d: d.fpid == 0)
p_second = Process("Second", lambda d: d.fpid == 1)
self.assertRaises(TypeError, McStack, p_first, p_second,
data_uncertainty=True)
stack = McStack(p_first)
self.assertRaises(TypeError, stack.add_process, p_second,
data_uncertainty=True)
[docs] def test_data_stack(self):
"""
Check that data_stack and generic stack agree.
mc.
"""
df = pd.DataFrame({
"x": [-0.99, -1.67, -0.49, -0.97, 1.14, 0.78, 0.43, 1.24, -1.02,
0.77, -0.99, -1.67, -0.49, -0.97, 1.14, 0.78, 0.43, 1.24,
-1.02, 0.77, 0],
"weight": [1.1, 0.9, 1.2, 0.8, 1.3, 0.7, 1.4, 0.6, 1.5, 0.5,
1.6, 0.4, 1.7, 0.3, 1.8, 0.2, 1.9, 0.1, 2, 1.0, 42],
"fpid": [0]*10 + [1]*10 + [-1]})
bins = np.array([-2, -1, 0, 1, 2])
v_x = Variable("x", "x")
v_weight = Variable("weight", "weight")
p_first = Process("First", lambda d: d.fpid == 0)
p_second = Process("Second", lambda d: d.fpid == 1)
stack = Stack(p_first, p_second, data_uncertainty=True)
mc_stack = DataStack(p_first, p_second)
self.assertTrue((abs(
stack.get_total_uncertainty(df, bins, v_x, v_weight)
- mc_stack.get_total_uncertainty(df, bins, v_x, v_weight))
< 1e-12).all())
[docs] def test_data_stack_raises(self):
"""
Check that data_stack raises an exception if the data_uncertainty
argument is used.
"""
bins = np.array([-2, -1, 0, 1, 2])
v_x = Variable("x", "x")
v_weight = Variable("weight", "weight")
p_first = Process("First", lambda d: d.fpid == 0)
p_second = Process("Second", lambda d: d.fpid == 1)
self.assertRaises(TypeError, DataStack, p_first, p_second,
data_uncertainty=True)
stack = DataStack(p_first)
self.assertRaises(TypeError, stack.add_process, p_second,
data_uncertainty=True)