ViolaJones/python/common.py

136 lines
5.2 KiB
Python

from toolbox import pickle_multi_loader, format_time_ns, unit_test_argsort_2d, header, footer, formatted_line, formatted_row
from typing import List, Tuple
from time import perf_counter_ns
from sys import stderr
import numpy as np
from config import OUT_DIR, DATA_DIR, __DEBUG
def unit_test(TS: List[int], labels: List[str] = ['CPU', 'GPU', 'PY', 'PGPU'], tol: float = 1e-8) -> None:
"""Test if the each result is equals to other devices.
Given ViolaJones is a fully deterministic algorithm. The results, regardless the device, should be the same
(given the floating point fluctuations), this function check this assertion.
Args:
TS (List[int]): Number of trained weak classifiers
labels (List[str], optional): List of the trained device names. Defaults to ['CPU', 'GPU', 'PY', 'PGPU'] (see config.py for more info)
tol (float, optional): Float difference tolerance. Defaults to 1e-8
"""
if len(labels) < 2:
return print('Not enough devices to test')
unit_gaps = [37, -10, -18, 29]
header(unit_gaps, ['Unit testing', 'Test state', 'Time spent (ns)', 'Formatted time spent'])
unit_timestamp = perf_counter_ns()
n_total, n_success = 0, 0
def test_fnc(title, fnc):
nonlocal n_total, n_success
n_total += 1
s = perf_counter_ns()
state = fnc()
e = perf_counter_ns() - s
if state:
formatted_row(unit_gaps, [title, 'Passed', f'{e:,}', format_time_ns(e)])
n_success += 1
else:
formatted_row(unit_gaps, [title, 'Failed', f'{e:,}', format_time_ns(e)])
for set_name in ['train', 'test']:
for filename in ['ii', 'feat']:
title = f'X_{set_name}_{filename}'
print(f'{filename}...', file = stderr, end = '\r')
bs = pickle_multi_loader([f'{title}_{label}' for label in labels], OUT_DIR)
for i, (b1, l1) in enumerate(zip(bs, labels)):
if b1 is None:
if __DEBUG:
formatted_row(unit_gaps, [f'{title:<22} - {l1:<12}', 'Skipped', 'None', 'None'])
continue
for j, (b2, l2) in enumerate(zip(bs, labels)):
if i >= j:
continue
if b2 is None:
if __DEBUG:
formatted_row(unit_gaps, [f'{title:<22} - {l1:<4} vs {l2:<4}', 'Skipped', 'None', 'None'])
continue
test_fnc(f'{title:<22} - {l1:<4} vs {l2:<4}', lambda: np.abs(b1 - b2).mean() < tol)
title = f'X_{set_name}_feat_argsort'
print(f'Loading {title}...', file = stderr, end = '\r')
feat = None
#indices = pickle_multi_loader(['indices'], OUT_DIR)[0]
bs = []
for label in labels:
if feat is None:
feat_tmp = pickle_multi_loader([f'X_{set_name}_feat_{label}'], OUT_DIR)[0]
if feat_tmp is not None:
#feat = feat_tmp[indices]
feat = feat_tmp
bs.append(pickle_multi_loader([f'{title}_{label}'], OUT_DIR)[0])
for i, (b1, l1) in enumerate(zip(bs, labels)):
if b1 is None:
if __DEBUG:
formatted_row(unit_gaps, [f'{title:<22} - {l1:<12}', 'Skipped', 'None', 'None'])
continue
if feat is not None:
test_fnc(f'{title:<22} - {l1:<4} argsort', lambda: unit_test_argsort_2d(feat, b1))
for j, (b2, l2) in enumerate(zip(bs, labels)):
if i >= j:
continue
if b2 is None:
if __DEBUG:
formatted_row(unit_gaps, [f'{title:<22} - {l1:<4} vs {l2:<4}', 'Skipped', 'None', 'None'])
continue
test_fnc(f'{title:<22} - {l1:<4} vs {l2:<4}', lambda: np.abs(b1 - b2).mean() < tol)
for T in TS:
for filename in ['alphas', 'final_classifiers']:
print(f'{filename}_{T}...', file = stderr, end = '\r')
bs = pickle_multi_loader([f'{filename}_{T}_{label}' for label in labels])
for i, (b1, l1) in enumerate(zip(bs, labels)):
if b1 is None:
if __DEBUG:
formatted_row(unit_gaps, [f"{filename + '_' + str(T):<22} - {l1:<12}", 'Skipped', 'None', 'None'])
continue
for j, (b2, l2) in enumerate(zip(bs, labels)):
if i >= j:
continue
if b2 is None:
if __DEBUG:
formatted_row(unit_gaps, [f"{filename + '_' + str(T):<22} - {l1:<4} vs {l2:<4}", 'Skipped', 'None', 'None'])
continue
test_fnc(f"{filename + '_' + str(T):<22} - {l1:<4} vs {l2:<4}", lambda: np.abs(b1 - b2).mean() < tol)
time_spent = perf_counter_ns() - unit_timestamp
if n_total == 0:
formatted_row(unit_gaps, ['Unit testing summary', 'No files', f'{time_spent:,}', format_time_ns(time_spent)])
else:
formatted_line(unit_gaps, '', '', '', '')
formatted_row(unit_gaps, ['Unit testing summary', f'{n_success}/{n_total}', f'{time_spent:,}', format_time_ns(time_spent)])
footer(unit_gaps)
def load_datasets(data_dir: str = DATA_DIR) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Load the datasets.
Args:
data_dir (str, optional): [description]. Defaults to DATA_DIR (see config.py)
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: X_train, y_train, X_test, y_test
"""
bytes_to_int_list = lambda b: list(map(int, b.rstrip().split(' ')))
def load(set_name: str) -> np.ndarray:
with open(f'{data_dir}/{set_name}.bin', 'r') as f:
shape = bytes_to_int_list(f.readline())
return np.asarray(bytes_to_int_list(f.readline()), dtype = np.uint8).reshape(shape)
return load('X_train'), load('y_train'), load('X_test'), load('y_test')