python : improved documentation

This commit is contained in:
saundersp 2024-04-28 22:35:42 +02:00
parent c71b04f00d
commit 4a42747837
5 changed files with 78 additions and 77 deletions

View File

@ -18,7 +18,7 @@ else:
@njit('uint8[:, :, :, :](uint16, uint16)') @njit('uint8[:, :, :, :](uint16, uint16)')
def build_features(width: int, height: int) -> np.ndarray: def build_features(width: int, height: int) -> np.ndarray:
"""Initialize the features base on the input shape. """Initialize the features based on the input shape.
Args: Args:
shape (Tuple[int, int]): Shape of the image (Width, Height) shape (Tuple[int, int]): Shape of the image (Width, Height)
@ -90,9 +90,31 @@ def classify_weak_clf(x_feat_i: np.ndarray, threshold: int, polarity: int) -> np
res[polarity * x_feat_i < polarity * threshold] = 1 res[polarity * x_feat_i < polarity * threshold] = 1
return res return res
@njit('uint8[:](float64[:], int32[:, :], int32[:, :])')
def classify_viola_jones(alphas: np.ndarray, classifiers: np.ndarray, X_feat: np.ndarray) -> np.ndarray:
"""Classify the trained classifiers on the given features.
Args:
alphas (np.ndarray): Trained alphas
classifiers (np.ndarray): Trained classifiers
X_feat (np.ndarray): Integrated features
Returns:
np.ndarray: Classification results
"""
total = np.zeros(X_feat.shape[1], dtype = np.float64)
for i, alpha in enumerate(tqdm_iter(alphas, "Classifying ViolaJones")):
(j, threshold, polarity) = classifiers[i]
total += alpha * classify_weak_clf(X_feat[j], threshold, polarity)
y_pred = np.zeros(X_feat.shape[1], dtype = np.uint8)
y_pred[total >= 0.5 * np.sum(alphas)] = 1
return y_pred
@njit('Tuple((int32, float64, float64[:]))(int32[:, :], float64[:], int32[:, :], uint8[:])') @njit('Tuple((int32, float64, float64[:]))(int32[:, :], float64[:], int32[:, :], uint8[:])')
def select_best(classifiers: np.ndarray, weights: np.ndarray, X_feat: np.ndarray, y: np.ndarray) -> Tuple[int, float, np.ndarray]: def select_best(classifiers: np.ndarray, weights: np.ndarray, X_feat: np.ndarray, y: np.ndarray) -> Tuple[int, float, np.ndarray]:
"""Select the best classifier given theirs predictions. """Select the best classifier given their predictions.
Args: Args:
classifiers (np.ndarray): The weak classifiers classifiers (np.ndarray): The weak classifiers
@ -139,28 +161,6 @@ def train_viola_jones(T: int, X_feat: np.ndarray, X_feat_argsort: np.ndarray, y:
return alphas, final_classifier return alphas, final_classifier
@njit('uint8[:](float64[:], int32[:, :], int32[:, :])')
def classify_viola_jones(alphas: np.ndarray, classifiers: np.ndarray, X_feat: np.ndarray) -> np.ndarray:
"""Classify the trained classifiers on the given features.
Args:
alphas (np.ndarray): Trained alphas
classifiers (np.ndarray): Trained classifiers
X_feat (np.ndarray): Integrated features
Returns:
np.ndarray: Classification results
"""
total = np.zeros(X_feat.shape[1], dtype = np.float64)
for i, alpha in enumerate(tqdm_iter(alphas, "Classifying ViolaJones")):
(j, threshold, polarity) = classifiers[i]
total += alpha * classify_weak_clf(X_feat[j], threshold, polarity)
y_pred = np.zeros(X_feat.shape[1], dtype = np.uint8)
y_pred[total >= 0.5 * np.sum(alphas)] = 1
return y_pred
#@njit #@njit
#def get_best_anova_features(X: np.ndarray, y: np.ndarray) -> np.ndarray: #def get_best_anova_features(X: np.ndarray, y: np.ndarray) -> np.ndarray:
# #SelectPercentile(f_classif, percentile = 10).fit(X, y).get_support(indices = True) # #SelectPercentile(f_classif, percentile = 10).fit(X, y).get_support(indices = True)

View File

@ -175,7 +175,7 @@ def argsort_bounded(d_a: np.ndarray, d_indices: np.ndarray, low: int, high: int)
stack[top] = high stack[top] = high
@njit('uint16[:, :](int32[:, :])') @njit('uint16[:, :](int32[:, :])')
def argsort(X_feat: np.ndarray) -> np.ndarray: def argsort_2d(X_feat: np.ndarray) -> np.ndarray:
"""Perform an indirect sort of a given array. """Perform an indirect sort of a given array.
Args: Args:

View File

@ -32,8 +32,8 @@ def __kernel_scan_3d__(n: int, j: int, d_inter: np.ndarray, d_a: np.ndarray) ->
Args: Args:
n (int): Number of width blocks n (int): Number of width blocks
j (int): Temporary sum index j (int): Temporary sum index
d_inter (np.ndarray): Temporary sums in device to add d_inter (np.ndarray): Temporary sums on device to add
d_a (np.ndarray): Dataset of images in device to apply sum d_a (np.ndarray): Dataset of images on device to apply sum
""" """
x_coor, y_coor = cuda.grid(2) x_coor, y_coor = cuda.grid(2)
@ -76,8 +76,8 @@ def __add_3d__(d_X: np.ndarray, d_s: np.ndarray, n: int, m: int) -> None:
"""GPU kernel for parallel sum. """GPU kernel for parallel sum.
Args: Args:
d_X (np.ndarray): Dataset of images in device d_X (np.ndarray): Dataset of images on device
d_s (np.ndarray): Temporary sums in device to add d_s (np.ndarray): Temporary sums on device to add
n (int): Number of width blocks n (int): Number of width blocks
m (int): Height of a block m (int): Height of a block
""" """
@ -131,7 +131,7 @@ def __transpose_kernel__(d_X: np.ndarray, d_Xt: np.ndarray) -> None:
"""GPU kernel of the function __transpose_3d__. """GPU kernel of the function __transpose_3d__.
Args: Args:
d_X (np.ndarray): Dataset of images in device d_X (np.ndarray): Dataset of images on device
d_Xt(np.ndarray): Transposed dataset of images d_Xt(np.ndarray): Transposed dataset of images
width (int): Width of each images in the dataset width (int): Width of each images in the dataset
height (int): Height of each images in the dataset height (int): Height of each images in the dataset
@ -184,11 +184,11 @@ def __train_weak_clf_kernel__(d_classifiers: np.ndarray, d_y: np.ndarray, d_X_fe
"""GPU kernel of the function train_weak_clf. """GPU kernel of the function train_weak_clf.
Args: Args:
d_classifiers (np.ndarray): Weak classifiers to train d_classifiers (np.ndarray): Weak classifiers on device to train
d_y (np.ndarray): Labels of the features d_y (np.ndarray): Labels of the features on device
d_X_feat (np.ndarray): Feature images dataset d_X_feat (np.ndarray): Feature images dataset on device
d_X_feat_argsort (np.ndarray): Sorted indexes of the integrated features d_X_feat_argsort (np.ndarray): Sorted indexes of the integrated features on device
d_weights (np.ndarray): Weights of the features d_weights (np.ndarray): Weights of the features on device
total_pos (float): Total of positive labels in the dataset total_pos (float): Total of positive labels in the dataset
total_neg (float): Total of negative labels in the dataset total_neg (float): Total of negative labels in the dataset
""" """
@ -259,29 +259,29 @@ def __compute_feature__(ii: np.ndarray, x: int, y: int, w: int, h: int) -> int:
return ii[y + h, x + w] + ii[y, x] - ii[y + h, x] - ii[y, x + w] return ii[y + h, x + w] + ii[y, x] - ii[y + h, x] - ii[y, x + w]
@cuda.jit('void(int32[:, :], uint8[:, :, :, :], uint32[:, :, :])') @cuda.jit('void(int32[:, :], uint8[:, :, :, :], uint32[:, :, :])')
def __apply_feature_kernel__(X_feat: np.ndarray, feats: np.ndarray, X_ii: np.ndarray) -> None: def __apply_feature_kernel__(d_X_feat: np.ndarray, d_feats: np.ndarray, d_X_ii: np.ndarray) -> None:
"""GPU kernel of the function apply_features. """GPU kernel of the function apply_features.
Args: Args:
X_feat (np.ndarray): Feature images dataset on device d_X_feat (np.ndarray): Feature images dataset on device
feats (np.ndarray): Features on device to apply d_feats (np.ndarray): Features on device to apply
X_ii (np.ndarray): Integrated image dataset on device d_X_ii (np.ndarray): Integrated image dataset on device
n (int): Number of features n (int): Number of features
m (int): Number of images of the dataset m (int): Number of images of the dataset
""" """
x, y = cuda.grid(2) x, y = cuda.grid(2)
if x >= feats.shape[0] or y >= X_ii.shape[0]: if x >= d_feats.shape[0] or y >= d_X_ii.shape[0]:
return return
p_x, p_y, p_w, p_h = feats[x, 0, 0] p_x, p_y, p_w, p_h = d_feats[x, 0, 0]
p1_x, p1_y, p1_w, p1_h = feats[x, 0, 1] p1_x, p1_y, p1_w, p1_h = d_feats[x, 0, 1]
n_x, n_y, n_w, n_h = feats[x, 1, 0] n_x, n_y, n_w, n_h = d_feats[x, 1, 0]
n1_x, n1_y, n1_w, n1_h = feats[x, 1, 1] n1_x, n1_y, n1_w, n1_h = d_feats[x, 1, 1]
sP = __compute_feature__(X_ii[y], p_x, p_y, p_w, p_h) + \ sP = __compute_feature__(d_X_ii[y], p_x, p_y, p_w, p_h) + \
__compute_feature__(X_ii[y], p1_x, p1_y, p1_w, p1_h) __compute_feature__(d_X_ii[y], p1_x, p1_y, p1_w, p1_h)
sN = __compute_feature__(X_ii[y], n_x, n_y, n_w, n_h) + \ sN = __compute_feature__(d_X_ii[y], n_x, n_y, n_w, n_h) + \
__compute_feature__(X_ii[y], n1_x, n1_y, n1_w, n1_h) __compute_feature__(d_X_ii[y], n1_x, n1_y, n1_w, n1_h)
X_feat[x, y] = sP - sN d_X_feat[x, y] = sP - sN
#@njit('int32[:, :](uint8[:, :, :, :], uint32[:, :, :])') #@njit('int32[:, :](uint8[:, :, :, :], uint32[:, :, :])')
def apply_features(feats: np.ndarray, X_ii: np.ndarray) -> np.ndarray: def apply_features(feats: np.ndarray, X_ii: np.ndarray) -> np.ndarray:
@ -303,7 +303,7 @@ def apply_features(feats: np.ndarray, X_ii: np.ndarray) -> np.ndarray:
return d_X_feat.copy_to_host() return d_X_feat.copy_to_host()
@cuda.jit('int32(int32[:], uint16[:], int32, int32)', device = True) @cuda.jit('int32(int32[:], uint16[:], int32, int32)', device = True)
def _as_partition_(d_a: np.ndarray, d_indices: np.ndarray, l: int, h: int) -> int: def _as_partition_(d_a: np.ndarray, d_indices: np.ndarray, low: int, high: int) -> int:
"""Partition of the argsort algorithm. """Partition of the argsort algorithm.
Args: Args:
@ -315,10 +315,10 @@ def _as_partition_(d_a: np.ndarray, d_indices: np.ndarray, l: int, h: int) -> in
Returns: Returns:
int: Last index sorted int: Last index sorted
""" """
i = l - 1 i = low - 1
j = l j = low
for j in range(l, h + 1): for j in range(low, high + 1):
if d_a[d_indices[j]] < d_a[d_indices[h]]: if d_a[d_indices[j]] < d_a[d_indices[high]]:
i += 1 i += 1
d_indices[i], d_indices[j] = d_indices[j], d_indices[i] d_indices[i], d_indices[j] = d_indices[j], d_indices[i]
@ -368,11 +368,11 @@ def argsort_bounded(d_a: np.ndarray, d_indices: np.ndarray, low: int, high: int)
@cuda.jit('void(int32[:, :], uint16[:, :])') @cuda.jit('void(int32[:, :], uint16[:, :])')
def argsort_flatter(d_a: np.ndarray, d_indices: np.ndarray) -> None: def argsort_flatter(d_a: np.ndarray, d_indices: np.ndarray) -> None:
# TODO Finish doxygen # TODO Finish doxygen
"""Cuda kernel where argsort is applied to every columns of a given 2D array. """Cuda kernel where argsort is applied to every column of a given 2D array.
Args: Args:
d_a (np.ndarray): Array in device to sort d_a (np.ndarray): 2D Array on device to sort
d_indices (np.ndarray): Array of indices on device to write to d_indices (np.ndarray): 2D Array of indices on device to write to
""" """
i = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x i = cuda.blockIdx.x * cuda.blockDim.x + cuda.threadIdx.x
if i < d_a.shape[0]: if i < d_a.shape[0]:
@ -380,19 +380,19 @@ def argsort_flatter(d_a: np.ndarray, d_indices: np.ndarray) -> None:
d_indices[i, j] = j d_indices[i, j] = j
argsort_bounded(d_a[i], d_indices[i], 0, d_a.shape[1] - 1) argsort_bounded(d_a[i], d_indices[i], 0, d_a.shape[1] - 1)
def argsort(a: np.ndarray) -> np.ndarray: def argsort_2d(a: np.ndarray) -> np.ndarray:
"""Perform an indirect sort of a given array """Perform an indirect sort on each column of a given 2D array
Args: Args:
a (np.ndarray): Array to sort a (np.ndarray): 2D Array to sort
Returns: Returns:
np.ndarray: Array of indices that sort the array np.ndarray: 2D Array of indices that sort the array
""" """
indices = np.empty_like(a, dtype = np.uint16) indices = np.empty_like(a, dtype = np.uint16)
n_blocks = int(np.ceil(np.divide(a.shape[0], NB_THREADS))) n_blocks = int(np.ceil(np.divide(a.shape[0], NB_THREADS)))
d_X_feat = cuda.to_device(a) d_a = cuda.to_device(a)
d_indices = cuda.to_device(indices) d_indices = cuda.to_device(indices)
argsort_flatter[n_blocks, NB_THREADS](d_X_feat, d_indices) argsort_flatter[n_blocks, NB_THREADS](d_a, d_indices)
cuda.synchronize() cuda.synchronize()
return d_indices.copy_to_host() return d_indices.copy_to_host()

View File

@ -20,13 +20,13 @@ if __DEBUG:
from config import IDX_INSPECT, IDX_INSPECT_OFFSET from config import IDX_INSPECT, IDX_INSPECT_OFFSET
if GPU_BOOSTED: if GPU_BOOSTED:
from ViolaJonesGPU import apply_features, set_integral_image, argsort from ViolaJonesGPU import apply_features, set_integral_image, argsort_2d
label = 'GPU' if COMPILE_WITH_C else 'PGPU' label = 'GPU' if COMPILE_WITH_C else 'PGPU'
# The parallel prefix sum doesn't use the whole GPU so numba output some annoying warnings, this disables it # The parallel prefix sum doesn't use the whole GPU so numba output some annoying warnings, this disables it
from numba import config from numba import config
config.CUDA_LOW_OCCUPANCY_WARNINGS = 0 config.CUDA_LOW_OCCUPANCY_WARNINGS = 0
else: else:
from ViolaJonesCPU import apply_features, set_integral_image, argsort from ViolaJonesCPU import apply_features, set_integral_image, argsort_2d
label = 'CPU' if COMPILE_WITH_C else 'PY' label = 'CPU' if COMPILE_WITH_C else 'PY'
def preprocessing() -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: def preprocessing() -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
@ -37,7 +37,7 @@ def preprocessing() -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.
- Calculate features - Calculate features
- Calculate integral images - Calculate integral images
- Apply features to images - Apply features to images
- Calculate argsort of the featured images. - Calculate argsort of the featured images
Returns: Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: Tuple containing in order : training features, training features sorted indexes, training labels, testing features, testing labels Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: Tuple containing in order : training features, training features sorted indexes, training labels, testing features, testing labels
@ -119,7 +119,7 @@ def preprocessing() -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.
# X_train_feat, X_test_feat = X_train_feat[indices], X_test_feat[indices] # X_train_feat, X_test_feat = X_train_feat[indices], X_test_feat[indices]
X_train_feat_argsort = state_saver(f'Precalculating training set argsort ({label})', preproc_gaps[0], f'X_train_feat_argsort_{label}', X_train_feat_argsort = state_saver(f'Precalculating training set argsort ({label})', preproc_gaps[0], f'X_train_feat_argsort_{label}',
lambda: argsort(X_train_feat), FORCE_REDO, SAVE_STATE) lambda: argsort_2d(X_train_feat), FORCE_REDO, SAVE_STATE)
if __DEBUG: if __DEBUG:
print('X_train_feat_argsort') print('X_train_feat_argsort')
@ -128,7 +128,7 @@ def preprocessing() -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.
benchmark_function('Arg unit test', preproc_gaps[0], lambda: unit_test_argsort_2d(X_train_feat, X_train_feat_argsort)) benchmark_function('Arg unit test', preproc_gaps[0], lambda: unit_test_argsort_2d(X_train_feat, X_train_feat_argsort))
X_test_feat_argsort = state_saver(f'Precalculating testing set argsort ({label})', preproc_gaps[0], f'X_test_feat_argsort_{label}', X_test_feat_argsort = state_saver(f'Precalculating testing set argsort ({label})', preproc_gaps[0], f'X_test_feat_argsort_{label}',
lambda: argsort(X_test_feat), FORCE_REDO, SAVE_STATE) lambda: argsort_2d(X_test_feat), FORCE_REDO, SAVE_STATE)
if __DEBUG: if __DEBUG:
print('X_test_feat_argsort') print('X_test_feat_argsort')

View File

@ -4,7 +4,7 @@ import numpy as np
from sys import stderr from sys import stderr
import pickle import pickle
import os import os
from config import MODEL_DIR, OUT_DIR from config import MODEL_DIR, OUT_DIR, __DEBUG
from decorators import njit from decorators import njit
def formatted_row(gaps: list[int], titles: list[str], separator: str = '') -> None: def formatted_row(gaps: list[int], titles: list[str], separator: str = '') -> None:
@ -49,7 +49,7 @@ def header(gaps: list[int], titles: list[str]) -> None:
formatted_line(gaps, '', '', '', '') formatted_line(gaps, '', '', '', '')
def footer(gaps: list[int]) -> None: def footer(gaps: list[int]) -> None:
"""Print a formatted fooder with the given sizes """Print a formatted footer with the given sizes.
Args: Args:
gaps: List of size gaps gaps: List of size gaps
@ -128,7 +128,7 @@ def pickle_multi_loader(filenames: List[str], save_dir: str = MODEL_DIR) -> List
return b return b
def benchmark_function(step_name: str, column_width: int, fnc: Callable) -> Any: def benchmark_function(step_name: str, column_width: int, fnc: Callable) -> Any:
"""Benchmark a function and display the result of stdout. """Benchmark a function and display the result in stdout.
Args: Args:
step_name (str): Name of the function to call step_name (str): Name of the function to call
@ -202,14 +202,14 @@ def state_saver(step_name: str, column_width: int, filename: Union[str, List[str
@njit('boolean(int32[:, :], uint16[:, :])') @njit('boolean(int32[:, :], uint16[:, :])')
def unit_test_argsort_2d(arr: np.ndarray, indices: np.ndarray) -> bool: def unit_test_argsort_2d(arr: np.ndarray, indices: np.ndarray) -> bool:
"""Test if a given array of indices sort a given array. """Test if a given 2D array of indices sort a given 2D array.
Args: Args:
arr (np.ndarray): Array of data arr (np.ndarray): 2D Array of data
indices (np.ndarray): Indices that sort arr indices (np.ndarray): 2D Indices that sort the array
Returns: Returns:
bool: Success of the test bool: Whether the test was successful
""" """
n = indices.shape[0] n = indices.shape[0]
total = indices.shape[0] * indices.shape[1] total = indices.shape[0] * indices.shape[1]
@ -217,6 +217,7 @@ def unit_test_argsort_2d(arr: np.ndarray, indices: np.ndarray) -> bool:
for j in range(sub_indices.shape[0] - 1): for j in range(sub_indices.shape[0] - 1):
if arr[i, sub_indices[j]] <= arr[i, sub_indices[j + 1]]: if arr[i, sub_indices[j]] <= arr[i, sub_indices[j + 1]]:
n += 1 n += 1
if __DEBUG:
if n != total: if n != total:
print(n, total, n / (total)) print(n, total, n / (total))
return n == total return n == total