Source code for audmetric.core.api

from __future__ import annotations

import collections
from collections.abc import Callable
from collections.abc import Sequence
import math
from typing import TYPE_CHECKING
import warnings

import numpy as np


if TYPE_CHECKING:
    import pandas as pd

import audeer

from audmetric.core.utils import END
from audmetric.core.utils import FILE
from audmetric.core.utils import START
from audmetric.core.utils import assert_equal_length
from audmetric.core.utils import infer_labels
from audmetric.core.utils import is_segmented_index
from audmetric.core.utils import scores_per_subgroup_and_class


[docs]def accuracy( truth: Sequence[object], prediction: Sequence[object], labels: Sequence[str | int] = None, ) -> float: r"""Classification accuracy. .. math:: \text{accuracy} = \frac{\text{number of correct predictions}} {\text{number of total predictions}} Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. Sample is considered in computation if either prediction or ground truth (logical OR) is contained in labels. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. Returns: accuracy of prediction :math:`\in [0, 1]` Raises: ValueError: if ``truth`` and ``prediction`` differ in length Examples: >>> accuracy([0, 0], [0, 1]) 0.5 """ assert_equal_length(truth, prediction) if labels is None: labels = infer_labels(truth, prediction) prediction = np.array(prediction) truth = np.array(truth) # keep where both prediction and truth contained in `labels` label_mask = np.nonzero( np.logical_or(np.isin(truth, labels), np.isin(prediction, labels)) ) truth = truth[label_mask] prediction = prediction[label_mask] if len(prediction) == 0: return np.nan else: return float(sum(prediction == truth) / len(prediction))
[docs]def concordance_cc( truth: Sequence[float], prediction: Sequence[float], *, ignore_nan: bool = False, ) -> float: r"""Concordance correlation coefficient. .. math:: \rho_c = \frac{2\rho\sigma_\text{prediction}\sigma_\text{truth}} {\sigma_\text{prediction}^2 + \sigma_\text{truth}^2 + ( \mu_\text{prediction}-\mu_\text{truth})^2} where :math:`\rho` is the Pearson correlation coefficient, :math:`\mu` the mean and :math:`\sigma^2` the variance.\ :footcite:`Lin1989` .. footbibliography:: Args: truth: ground truth values prediction: predicted values ignore_nan: if ``True`` all samples that contain ``NaN`` in ``truth`` or ``prediction`` are ignored Returns: concordance correlation coefficient :math:`\in [-1, 1]` Raises: ValueError: if ``truth`` and ``prediction`` differ in length Examples: >>> concordance_cc([0, 1, 2], [0, 1, 1]) 0.6666666666666665 """ assert_equal_length(truth, prediction) if not isinstance(truth, np.ndarray): truth = np.array(list(truth)) if not isinstance(prediction, np.ndarray): prediction = np.array(list(prediction)) if ignore_nan: mask = ~(np.isnan(truth) | np.isnan(prediction)) truth = truth[mask] prediction = prediction[mask] if len(prediction) < 2: return np.nan length = prediction.size mean_y = np.mean(truth) mean_x = np.mean(prediction) a = prediction - mean_x b = truth - mean_y numerator = 2 * np.dot(a, b) denominator = np.dot(a, a) + np.dot(b, b) + length * (mean_x - mean_y) ** 2 if denominator == 0: ccc = np.nan else: ccc = numerator / denominator return float(ccc)
[docs]def confusion_matrix( truth: Sequence[object], prediction: Sequence[object], labels: Sequence[object] = None, *, normalize: bool = False, ) -> list[list[int | float]]: r"""Confusion matrix. Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. normalize: normalize confusion matrix over the rows Returns: confusion matrix Raises: ValueError: if ``truth`` and ``prediction`` differ in length Examples: >>> truth = [0, 1, 2] >>> prediction = [0, 2, 0] >>> confusion_matrix(truth, prediction) [[1, 0, 0], [0, 0, 1], [1, 0, 0]] """ assert_equal_length(truth, prediction) if labels is None: labels = infer_labels(truth, prediction) truth = np.array(truth) prediction = np.array(prediction) matrix = [] for row in labels: row_indices = np.where(truth == row) y_row = prediction[row_indices] row_matrix = [] for column in labels: row_matrix += [len(np.where(y_row == column)[0])] matrix += [row_matrix] if normalize: for idx, row in enumerate(matrix): if np.sum(row) != 0: row_sum = float(np.sum(row)) matrix[idx] = [x / row_sum for x in row] return matrix
[docs]def detection_error_tradeoff( truth: Sequence[bool | int], prediction: Sequence[bool | int | float], ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: r"""Detection error tradeoff for verification experiments. The `detection error tradeoff (DET)`_ is a graph showing the false non-match rate (FNMR) against the false match rate (FMR). The FNMR indicates how often an enrolled speaker was missed. The FMR indicates how often an impostor was verified as the enrolled speaker. This function does not return a figure, but the FMR and FNMR, together with the corresponding verification thresholds at which a similarity value was regarded to belong to the enrolled speaker. ``truth`` may only contain entries like ``[1, 0, True, False...]``, whereas prediction values can also contain similarity scores, e.g. ``[0.8, 0.1, ...]``. The implementation was inspired by pyeer.eer_stats.calculate_roc but has been accelerated by using numpy-arrays instead of lists. .. _detection error tradeoff (DET): https://en.wikipedia.org/wiki/Detection_error_tradeoff .. _pyeer: https://github.com/manuelaguadomtz/pyeer Args: truth: ground truth classes prediction: predicted classes or similarity scores Returns: * false match rate (FMR) * false non-match rate (FNMR) * verification thresholds Raises: ValueError: if ``truth`` contains values different from ``1, 0, True, False`` Examples: >>> truth = [1, 0] >>> prediction = [0.9, 0.1] >>> detection_error_tradeoff(truth, prediction) (array([1., 0.]), array([0., 0.]), array([0.1, 0.9])) """ # noqa: E501 # Get mated scores # (genuine matching scores) # and non-mated scores # (impostor matching scores) gscores, iscores = _matching_scores(truth, prediction) gscores_number = len(gscores) iscores_number = len(iscores) # Labeling genuine scores as 1 and impostor scores as 0 gscores = np.column_stack((gscores, np.ones(gscores_number, dtype=int))) iscores = np.column_stack((iscores, np.zeros(iscores_number, dtype=int))) # Stacking scores all_scores = np.concatenate([gscores, iscores]) sorted_indices = np.argsort(all_scores[:, 0]) scores = all_scores[sorted_indices] cumul = np.cumsum(scores[:, 1]) # Grouping scores thresholds, u_indices = np.unique(scores[:, 0], return_index=True) # Calculating FNM and FM distributions fnm = cumul[u_indices] - scores[u_indices][:, 1] # rejecting s < t fm = iscores_number - (u_indices - fnm) # Calculating FMR and FNMR fnmr = fnm / gscores_number fmr = fm / iscores_number return fmr, fnmr, thresholds
[docs]def diarization_error_rate( truth: pd.Series, prediction: pd.Series, *, individual_file_mapping: bool = False, num_workers: int = 1, multiprocessing: bool = False, ) -> float: r"""Diarization error rate. .. math:: \text{DER} = \frac{\text{confusion}+\text{false alarm}+\text{miss}} {\text{total}} where :math:`\text{confusion}` is the total confusion duration, :math:`\text{false alarm}` is the total duration of predictions without an overlapping ground truth, :math:`\text{miss}` is the total duration of ground truth without an overlapping prediction, and :math:`\text{total}` is the total duration of ground truth segments. The diarization error rate can used when the labels are not known by the prediction model, e.g. for the task of speaker diarization on unknown speakers. This metric is computed the same way as :func:`audmetric.identification_error_rate`, but first creates a one-to-one mapping between truth and prediction labels. This implementation uses the 'greedy' method to compute the one-to-one mapping between truth and predicted labels. This method is faster than other implementations that optimize the confusion term, but may slightly over-estimate the diarization error rate. :footcite:`Bredin2017` .. footbibliography:: Args: truth: ground truth labels with a segmented index conform to `audformat`_ prediction: predicted labels with a segmented index conform to `audformat`_ individual_file_mapping: whether to create the mapping between truth and prediction labels individually for each file. If ``False``, all segments are taken into account to compute the mapping num_workers: number of threads or 1 for sequential processing multiprocessing: use multiprocessing instead of multithreading Returns: diarization error rate Raises: ValueError: if ``truth`` or ``prediction`` do not have a segmented index conform to `audformat`_ Examples: >>> import pandas as pd >>> import audformat >>> truth = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav"], ... starts=[0.0, 0.1], ... ends=[0.1, 0.2], ... ), ... data=["a", "b"], ... ) >>> prediction = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav", "f1.wav"], ... starts=[0, 0.1, 0.1], ... ends=[0.1, 0.15, 0.2], ... ), ... data=["0", "1", "0"], ... ) >>> diarization_error_rate(truth, prediction) 0.5 .. _audformat: https://audeering.github.io/audformat/data-format.html """ if not is_segmented_index(truth) or not is_segmented_index(prediction): raise ValueError( "The truth and prediction " "should be a pandas Series with a segmented index conform to audformat." ) # Map prediction and truth labels to unique names # to avoid confusion when there is an overlap pred_labels = prediction.unique() unique_pred_mapper = {label: f"p{i}" for i, label in enumerate(pred_labels)} prediction = prediction.map(unique_pred_mapper) truth_labels = truth.unique() unique_truth_mapper = {label: f"t{i}" for i, label in enumerate(truth_labels)} truth = truth.map(unique_truth_mapper) # If mapping should be computed individually for each file, # add a unique prefix to each label based on the file if individual_file_mapping: files = set(prediction.index.get_level_values(FILE).unique()).union( truth.index.get_level_values(FILE).unique() ) unique_file_prefix = {file: f"f{i}" for i, file in enumerate(files)} prediction_file_ids = prediction.reset_index()[FILE].map(unique_file_prefix) prediction_file_ids.index = prediction.index prediction = prediction_file_ids + prediction truth_file_ids = truth.reset_index()[FILE].map(unique_file_prefix) truth_file_ids.index = truth.index truth = truth_file_ids + truth # Now map from prediction label to truth label, # leaving prediction labels without a match as is pred2truthlabel = _diarization_mapper( truth, prediction, individual_file_mapping=individual_file_mapping, num_workers=num_workers, multiprocessing=multiprocessing, ) mapped_prediction = prediction.replace(pred2truthlabel) return identification_error_rate( truth, mapped_prediction, num_workers=num_workers, multiprocessing=multiprocessing, )
[docs]def edit_distance( truth: str | Sequence[int], prediction: str | Sequence[int], ) -> int: r"""Edit distance between two sequences of chars or ints. The implementation follows the `Wagner-Fischer algorithm`_. .. _Wagner-Fischer algorithm: https://en.wikipedia.org/wiki/Wagner%E2%80%93Fischer_algorithm Args: truth: ground truth sequence prediction: predicted sequence Returns: edit distance Examples: >>> truth = "lorem" >>> prediction = "lorm" >>> edit_distance(truth, prediction) 1 >>> truth = [0, 1, 2] >>> prediction = [0, 1] >>> edit_distance(truth, prediction) 1 """ if truth == prediction: return 0 elif len(prediction) == 0: return len(truth) elif len(truth) == 0: return len(prediction) m0 = [None] * (len(truth) + 1) m1 = [None] * (len(truth) + 1) for i in range(len(m0)): m0[i] = i for i in range(len(prediction)): m1[0] = i + 1 for j in range(len(truth)): cost = 0 if prediction[i] == truth[j] else 1 m1[j + 1] = min( m1[j] + 1, # deletion m0[j + 1] + 1, # insertion m0[j] + cost, ) # substitution for j in range(len(m0)): m0[j] = m1[j] return m1[len(truth)]
[docs]def equal_error_rate( truth: Sequence[bool | int], prediction: Sequence[bool | int | float], ) -> tuple[float, collections.namedtuple]: r"""Equal error rate for verification tasks. The equal error rate (EER) is the point where false non-match rate (FNMR) and the impostors or false match rate (FMR) are identical. The FNMR indicates how often an enrolled speaker was missed. The FMR indicates how often an impostor was verified as the enrolled speaker. In practice the score distribution is not continuous and an interval is returned instead. The EER value will be set as the midpoint of this interval::footcite:`Maio2002` .. math:: \text{EER} = \frac{ \min(\text{FNMR}[t], \text{FMR}[t]) + \max(\text{FNMR}[t], \text{FMR}[t]) }{2} with :math:`t = \text{argmin}(|\text{FNMR} - \text{FMR}|)`. ``truth`` may only contain entries like ``[1, 0, True, False...]``, whereas prediction values can also contain similarity scores, e.g. ``[0.8, 0.1, ...]``. The implementation is identical with the one provided by the pyeer_ package. .. footbibliography:: .. _pyeer: https://github.com/manuelaguadomtz/pyeer Args: truth: ground truth classes prediction: predicted classes or similarity scores Returns: * equal error rate (EER) * namedtuple containing ``fmr``, ``fnmr``, ``thresholds``, ``threshold`` whereas the last one corresponds to the threshold corresponding to the returned EER Raises: ValueError: if ``truth`` contains values different from ``1, 0, True, False`` Examples: >>> truth = [0, 1, 0, 1, 0] >>> prediction = [0.2, 0.8, 0.4, 0.5, 0.5] >>> eer, stats = equal_error_rate(truth, prediction) >>> eer 0.16666666666666666 >>> stats.threshold 0.5 """ Stats = collections.namedtuple( "stats", [ "fmr", # False match rates (FMR) "fnmr", # False non-match rates (FNMR) "thresholds", # Thresholds "threshold", # verification threshold for EER ], ) fmr, fnmr, thresholds = detection_error_tradeoff(truth, prediction) diff = fmr - fnmr # t1 and t2 are our time indices t2 = np.where(diff <= 0)[0] if len(t2) > 0: t2 = t2[0] else: warnings.warn( "The false match rate " "and false non-match rate curves " "do not intersect each other.", RuntimeWarning, ) eer = 1.0 threshold = float(thresholds[0]) return eer, Stats(fmr, fnmr, thresholds, threshold) t1 = t2 - 1 if diff[t2] != 0 and t2 != 0 else t2 if fmr[t1] + fnmr[t1] <= fmr[t2] + fnmr[t2]: eer = (fnmr[t1] + fmr[t1]) / 2.0 threshold = thresholds[t1] else: # pragma: nocover (couldn't find a test to trigger this) eer = (fnmr[t2] + fmr[t2]) / 2.0 threshold = thresholds[t2] eer = float(eer) threshold = float(threshold) return eer, Stats(fmr, fnmr, thresholds, threshold)
[docs]def event_confusion_matrix( truth: pd.Series, prediction: pd.Series, labels: Sequence[object] | None = None, *, onset_tolerance: float | None = 0.0, offset_tolerance: float | None = 0.0, duration_tolerance: float | None = None, normalize: bool = False, ) -> list[list[int | float]]: r"""Event-based confusion. This metric compares not only the labels of prediction and ground truth, but also the time windows they occur in. Each event is considered to be correctly identified if the predicted label is the same as the ground truth label, and if the onset is within the given ``onset_tolerance`` (in seconds) and the offset is within the given ``offset_tolerance`` (in seconds). Additionally to the ``offset_tolerance``, one can also specify the ``duration_tolerance``, to ensure that the offset occurs within a certain proportion of the reference event duration. If a prediction fulfills the ``duration_tolerance`` but not the ``offset_tolerance`` (or vice versa), it is still considered to be an overlapping segment. :footcite:`Mesaros2016` The resulting confusion matrix has one more row and and one more column than there are labels. The last row/column corresponds to the absence of any event. This allows to distinguish between segments that overlap but have differing labels, and false negatives that have no overlapping predicted segment as well as false positives that have no overlapping ground truth segment. .. footbibliography:: Args: truth: ground truth labels with a segmented index conform to `audformat`_ prediction: predicted labels with a segmented index conform to `audformat`_ labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically onset_tolerance: the onset tolerance in seconds. If the predicted segment's onset does not occur within this time window compared to the ground truth segment's onset, it is not considered correct offset_tolerance: the offset tolerance in seconds. If the predicted segment's offset does not occur within this time window compared to the ground truth segment's offset, it is not considered correct, unless the ``duration_tolerance`` is specified and fulfilled duration_tolerance: the duration tolerance as a measure of proportion of the ground truth segment's total duration. If the ``offset_tolerance`` is not fulfilled, and the predicted segment's offset does not occur within this time window compared to the ground truth segment's offset, it is not considered correct normalize: normalize confusion matrix over the rows Returns: event confusion matrix Raises: ValueError: if ``truth`` or ``prediction`` do not have a segmented index conform to `audformat`_ Examples: >>> import pandas as pd >>> import audformat >>> truth = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav"] * 4, ... starts=[0, 0.1, 0.2, 0.3], ... ends=[0.1, 0.2, 0.3, 0.4], ... ), ... data=["a", "a", "b", "b"], ... ) >>> prediction = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav"] * 4 + ["f2.wav"], ... starts=[0, 0.09, 0.2, 0.31, 0.0], ... ends=[0.1, 0.2, 0.3, 0.41, 1.0], ... ), ... data=["a", "b", "a", "b", "b"], ... ) >>> event_confusion_matrix( ... truth, prediction, onset_tolerance=0.02, offset_tolerance=0.02 ... ) [[1, 1, 0], [1, 1, 0], [0, 1, 0]] .. _audformat: https://audeering.github.io/audformat/data-format.html """ if not is_segmented_index(truth) or not is_segmented_index(prediction): raise ValueError( "For event-based metrics, the truth and prediction " "should be a pandas Series with a segmented index conform to audformat." ) if labels is None: labels = infer_labels(truth.values, prediction.values) # Confusion matrix of event labels + "no event" label cm = [[0 for _ in range(len(labels) + 1)] for _ in range(len(labels) + 1)] # Code based on 'greedy' event matching # at https://github.com/TUT-ARG/sed_eval/blob/0cb1b6d11ceec4fe500cc9b31079c9d8666ed6eb/sed_eval/sound_event.py#L1108 for file, file_truth in truth.groupby(level=FILE): file_pred = prediction[prediction.index.get_level_values(FILE) == file] # Sort index of truth and prediction to speedup the matching of segments # and to get the same result regardless of the index order file_truth = file_truth.sort_index() file_pred = file_pred.sort_index() n_truth = len(file_truth) n_pred = len(file_pred) # Store whether each individual truth segment is identified truth_correct = np.zeros(n_truth, dtype=bool) # Store whether each individual predicted segment is correct pred_correct = np.zeros(n_pred, dtype=bool) # Find all correct matches for i, ((_, start_truth, end_truth), label_truth) in enumerate( file_truth.items() ): start_truth = start_truth.total_seconds() end_truth = end_truth.total_seconds() for j, ((_, start_pred, end_pred), label_pred) in enumerate( file_pred.items() ): # Skip segments that have already been matched if pred_correct[j]: continue start_pred = start_pred.total_seconds() end_pred = end_pred.total_seconds() # This predicted segment and all following ones cannot match # if the true start time is exceeded by more than the allowed tolerance # and we move on to the next ground truth segment if onset_tolerance and start_truth + onset_tolerance < start_pred: break # Condition 1: labels are the same if label_truth == label_pred: # Condition 2: segments overlap in onset/offset if _segments_overlap( start_truth, end_truth, start_pred, end_pred, onset_tolerance, offset_tolerance, duration_tolerance, ): # Add entry to confusion matrix, # then move on to next ground truth segment label_id = labels.index(label_truth) cm[label_id][label_id] += 1 truth_correct[i] = True pred_correct[j] = True break # Indices of predicted segments without a ground truth match pred_unmatched = np.nonzero(np.logical_not(pred_correct))[0] # Indices of ground truth segments without a prediction match truth_unmatched = np.nonzero(np.logical_not(truth_correct))[0] preds_incorrect_match = np.zeros(n_pred) # Find overlapping segments with incorrect labels for i in truth_unmatched: start_truth = file_truth.index.get_level_values(START)[i].total_seconds() end_truth = file_truth.index.get_level_values(END)[i].total_seconds() label_truth = file_truth.iloc[i] for j in pred_unmatched: # Skip predicted segments # if they've already been counted as a label mismatch if preds_incorrect_match[j]: continue start_pred = file_pred.index.get_level_values(START)[j].total_seconds() end_pred = file_pred.index.get_level_values(END)[j].total_seconds() # This predicted segment and all following ones cannot match # if the true start time is exceeded by more than the allowed tolerance # and we move on to the next ground truth segment if onset_tolerance and start_truth + onset_tolerance < start_pred: break label_pred = file_pred.iloc[j] if _segments_overlap( start_truth, end_truth, start_pred, end_pred, onset_tolerance, offset_tolerance, duration_tolerance, ): preds_incorrect_match[j] = True # Segment overlaps although the label is a mismatch # so it counts as a confused label cm[labels.index(label_truth)][labels.index(label_pred)] += 1 break # Fill in remaining errors that have no confusions for i, label in enumerate(labels): n_label_truth = len(truth[truth == label]) # Count any ground truth segments that have no overlapping prediction at all n_missed = n_label_truth - sum(cm[i][: len(labels)]) cm[i][-1] += n_missed n_label_pred = len(prediction[prediction == label]) # Count any predictions that have no overlap with ground truth segments n_extra = n_label_pred - sum([cm[j][i] for j in range(len(labels))]) cm[-1][i] = n_extra if normalize: for idx, row in enumerate(cm): if np.sum(row) != 0: row_sum = float(np.sum(row)) cm[idx] = [x / row_sum for x in row] return cm
[docs]def event_error_rate( truth: str | Sequence[str | Sequence[int]], prediction: (str | Sequence[str | Sequence[int]]), ) -> float: r"""Event error rate based on edit distance. The event error rate is computed by aggregating the mean edit distances of each (truth, prediction)-pair and averaging the aggregated score by the number of pairs. The mean edit distance of each (truth, prediction)-pair is computed as an average of the edit distance over the length of the longer sequence of the corresponding pair. By normalizing over the longer sequence the normalized distance is bound to [0, 1]. Args: truth: ground truth classes prediction: predicted classes Returns: event error rate Raises: ValueError: if ``truth`` and ``prediction`` differ in length Examples: >>> event_error_rate([[0, 1]], [[0]]) 0.5 >>> event_error_rate([[0, 1], [2]], [[0], [2]]) 0.25 >>> event_error_rate(["lorem"], ["lorm"]) 0.2 >>> event_error_rate(["lorem", "ipsum"], ["lorm", "ipsum"]) 0.1 """ truth = audeer.to_list(truth) prediction = audeer.to_list(prediction) assert_equal_length(truth, prediction) eer = 0.0 for t, p in zip(truth, prediction): n = max(len(t), len(p)) n = n if n > 1 else 1 eer += edit_distance(t, p) / n num_samples = len(truth) if len(truth) > 1 else 1 return eer / num_samples
[docs]def event_fscore_per_class( truth: pd.Series, prediction: pd.Series, labels: Sequence[object] | None = None, *, zero_division: float = 0.0, propagate_nans: bool = False, onset_tolerance: float | None = 0.0, offset_tolerance: float | None = 0.0, duration_tolerance: float | None = None, ) -> dict[str, float]: r"""Event-based F-score per class. .. math:: \text{fscore}_k = \frac{\text{true positive}_k} {\text{true positive}_k + \frac{1}{2} (\text{false positive}_k + \text{false negative}_k)} This metric compares not only the labels of prediction and ground truth, but also the time windows they occur in. Each event is considered to be correctly identified if the predicted label is the same as the ground truth label, and if the onset is within the given ``onset_tolerance`` (in seconds) and the offset is within the given ``offset_tolerance`` (in seconds). Additionally to the ``offset_tolerance``, one can also specify the ``duration_tolerance``, to ensure that the offset occurs within a certain proportion of the reference event duration. If a prediction fulfills the ``duration_tolerance`` but not the ``offset_tolerance`` (or vice versa), it is still considered to be an overlapping segment. :footcite:`Mesaros2016` .. footbibliography:: Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. zero_division: set the value to return when there is a zero division propagate_nans: whether to set the F-score to ``NaN`` when recall or precision are ``NaN``. If ``False``, the F-score is only set to ``NaN`` when both recall and precision are ``NaN`` onset_tolerance: the onset tolerance in seconds. If the predicted segment's onset does not occur within this time window compared to the ground truth segment's onset, it is not considered correct offset_tolerance: the offset tolerance in seconds. If the predicted segment's offset does not occur within this time window compared to the ground truth segment's offset, it is not considered correct, unless the ``duration_tolerance`` is specified and fulfilled duration_tolerance: the duration tolerance as a measure of proportion of the ground truth segment's total duration. If the ``offset_tolerance`` is not fulfilled, and the predicted segment's offset does not occur within this time window compared to the ground truth segment's offset, it is not considered correct Returns: dictionary with label as key and F-score as value Raises: ValueError: if ``truth`` or ``prediction`` do not have a segmented index conform to `audformat`_ Examples: >>> import pandas as pd >>> import audformat >>> truth = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav"], ... starts=[0.0, 0.1], ... ends=[0.1, 0.2], ... ), ... data=["a", "b"], ... ) >>> prediction = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav"], ... starts=[0, 0.09], ... ends=[0.1, 0.2], ... ), ... data=["a", "a"], ... ) >>> event_fscore_per_class( ... truth, prediction, onset_tolerance=0.02, offset_tolerance=0.02 ... ) {'a': 0.6666666666666666, 'b': 0.0} .. _audformat: https://audeering.github.io/audformat/data-format.html """ if labels is None: labels = infer_labels(truth, prediction) precision = event_precision_per_class( truth, prediction, labels, zero_division=zero_division, onset_tolerance=onset_tolerance, offset_tolerance=offset_tolerance, duration_tolerance=duration_tolerance, ) recall = event_recall_per_class( truth, prediction, labels, zero_division=zero_division, onset_tolerance=onset_tolerance, offset_tolerance=offset_tolerance, duration_tolerance=duration_tolerance, ) fscore = {} for label, p, r in zip(labels, precision.values(), recall.values()): if np.isnan(p) or np.isnan(r): if propagate_nans: fscore[label] = np.nan else: fscore[label] = 0.0 elif p * r == 0: fscore[label] = 0.0 else: fscore[label] = (2 * p * r) / (p + r) return fscore
[docs]def event_precision_per_class( truth: pd.Series, prediction: pd.Series, labels: Sequence[object] | None = None, *, zero_division: float = 0.0, onset_tolerance: float | None = 0.0, offset_tolerance: float | None = 0.0, duration_tolerance: float | None = None, ) -> dict[str, float]: r"""Event-based precision per class. .. math:: \text{precision}_k = \frac{\text{true positive}_k} {\text{true positive}_k + \text{false positive}_k} This metric compares not only the labels of prediction and ground truth, but also the time windows they occur in. Each event is considered to be correctly identified if the predicted label is the same as the ground truth label, and if the onset is within the given ``onset_tolerance`` (in seconds) and the offset is within the given ``offset_tolerance`` (in seconds). Additionally to the ``offset_tolerance``, one can also specify the ``duration_tolerance``, to ensure that the offset occurs within a certain proportion of the reference event duration. If a prediction fulfills the ``duration_tolerance`` but not the ``offset_tolerance`` (or vice versa), it is still considered to be an overlapping segment. :footcite:`Mesaros2016` .. footbibliography:: Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. zero_division: set the value to return when there is a zero division onset_tolerance: the onset tolerance in seconds. If the predicted segment's onset does not occur within this time window compared to the ground truth segment's onset, it is not considered correct offset_tolerance: the offset tolerance in seconds. If the predicted segment's offset does not occur within this time window compared to the ground truth segment's offset, it is not considered correct, unless the ``duration_tolerance`` is specified and fulfilled duration_tolerance: the duration tolerance as a measure of proportion of the ground truth segment's total duration. If the ``offset_tolerance`` is not fulfilled, and the predicted segment's offset does not occur within this time window compared to the ground truth segment's offset, it is not considered correct Returns: dictionary with label as key and precision as value Raises: ValueError: if ``truth`` or ``prediction`` do not have a segmented index conform to `audformat`_ Examples: >>> import pandas as pd >>> import audformat >>> truth = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav"], ... starts=[0.0, 0.1], ... ends=[0.1, 0.2], ... ), ... data=["a", "b"], ... ) >>> prediction = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav"], ... starts=[0.0, 0.09], ... ends=[0.11, 0.2], ... ), ... data=["a", "a"], ... ) >>> event_precision_per_class( ... truth, prediction, onset_tolerance=0.02, offset_tolerance=0.02 ... ) {'a': 0.5, 'b': 0.0} .. _audformat: https://audeering.github.io/audformat/data-format.html """ return _event_metric_per_class( truth, prediction, labels, zero_division, onset_tolerance, offset_tolerance, duration_tolerance, axis=0, )
[docs]def event_recall_per_class( truth: pd.Series, prediction: pd.Series, labels: Sequence[object] | None = None, *, zero_division: float = 0.0, onset_tolerance: float | None = 0.0, offset_tolerance: float | None = 0.0, duration_tolerance: float | None = None, ) -> dict[str, float]: r"""Event-based recall per class. .. math:: \text{recall}_k = \frac{\text{true positive}_k} {\text{true positive}_k + \text{false negative}_k} This metric compares not only the labels of prediction and ground truth, but also the time windows they occur in. Each event is considered to be correctly identified if the predicted label is the same as the ground truth label, and if the onset is within the given ``onset_tolerance`` (in seconds) and the offset is within the given ``offset_tolerance`` (in seconds). Additionally to the ``offset_tolerance``, one can also specify the ``duration_tolerance``, to ensure that the offset occurs within a certain proportion of the reference event duration. If a prediction fulfills the ``duration_tolerance`` but not the ``offset_tolerance`` (or vice versa), it is still considered to be an overlapping segment. :footcite:`Mesaros2016` .. footbibliography:: Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. zero_division: set the value to return when there is a zero division onset_tolerance: the onset tolerance in seconds. If the predicted segment's onset does not occur within this time window compared to the ground truth segment's onset, it is not considered correct offset_tolerance: the offset tolerance in seconds. If the predicted segment's offset does not occur within this time window compared to the ground truth segment's offset, it is not considered correct, unless the ``duration_tolerance`` is specified and fulfilled duration_tolerance: the duration tolerance as a measure of proportion of the ground truth segment's total duration. If the ``offset_tolerance`` is not fulfilled, and the predicted segment's offset does not occur within this time window compared to the ground truth segment's offset, it is not considered correct Returns: dictionary with label as key and recall as value Raises: ValueError: if ``truth`` or ``prediction`` do not have a segmented index conform to `audformat`_ Examples: >>> import pandas as pd >>> import audformat >>> truth = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav"], ... starts=[0.0, 0.1], ... ends=[0.1, 0.2], ... ), ... data=["a", "b"], ... ) >>> prediction = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav"], ... starts=[0.0, 0.09], ... ends=[0.11, 0.2], ... ), ... data=["a", "a"], ... ) >>> event_recall_per_class( ... truth, prediction, onset_tolerance=0.02, offset_tolerance=0.02 ... ) {'a': 1.0, 'b': 0.0} .. _audformat: https://audeering.github.io/audformat/data-format.html """ return _event_metric_per_class( truth, prediction, labels, zero_division, onset_tolerance, offset_tolerance, duration_tolerance, axis=1, )
[docs]def event_unweighted_average_fscore( truth: pd.Series, prediction: pd.Series, labels: Sequence[object] | None = None, *, zero_division: float = 0.0, propagate_nans: bool = False, onset_tolerance: float | None = 0.0, offset_tolerance: float | None = 0.0, duration_tolerance: float | None = None, ) -> float: r"""Event-based unweighted average F-score. .. math:: \text{UAF} = \frac{1}{K} \sum^K_{k=1} \frac{\text{true positive}_k} {\text{true positive}_k + \frac{1}{2} (\text{false positive}_k + \text{false negative}_k)} This metric compares not only the labels of prediction and ground truth, but also the time windows they occur in. Each event is considered to be correctly identified if the predicted label is the same as the ground truth label, and if the onset is within the given ``onset_tolerance`` (in seconds) and the offset is within the given ``offset_tolerance`` (in seconds). Additionally to the ``offset_tolerance``, one can also specify the ``duration_tolerance``, to ensure that the offset occurs within a certain proportion of the reference event duration. If a prediction fulfills the ``duration_tolerance`` but not the ``offset_tolerance`` (or vice versa), it is still considered to be an overlapping segment. :footcite:`Mesaros2016` .. footbibliography:: Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. zero_division: set the value to return when there is a zero division propagate_nans: whether to set the F-score to ``NaN`` when recall or precision are ``NaN``. If ``False``, the F-score is only set to ``NaN`` when both recall and precision are ``NaN`` onset_tolerance: the onset tolerance in seconds. If the predicted segment's onset does not occur within this time window compared to the ground truth segment's onset, it is not considered correct offset_tolerance: the offset tolerance in seconds. If the predicted segment's offset does not occur within this time window compared to the ground truth segment's offset, it is not considered correct, unless the ``duration_tolerance`` is specified and fulfilled duration_tolerance: the duration tolerance as a measure of proportion of the ground truth segment's total duration. If the ``offset_tolerance`` is not fulfilled, and the predicted segment's offset does not occur within this time window compared to the ground truth segment's offset, it is not considered correct Returns: event-based unweighted average f-score Examples: >>> import pandas as pd >>> import audformat >>> truth = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav"], ... starts=[0.0, 0.1], ... ends=[0.1, 0.2], ... ), ... data=["a", "b"], ... ) >>> prediction = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav"], ... starts=[0, 0.09], ... ends=[0.1, 0.2], ... ), ... data=["a", "a"], ... ) >>> event_unweighted_average_fscore( ... truth, prediction, onset_tolerance=0.02, offset_tolerance=0.02 ... ) 0.3333333333333333 .. _audformat: https://audeering.github.io/audformat/data-format.html """ fscore = event_fscore_per_class( truth, prediction, labels, zero_division=zero_division, propagate_nans=propagate_nans, onset_tolerance=onset_tolerance, offset_tolerance=offset_tolerance, duration_tolerance=duration_tolerance, ) fscore = np.array(list(fscore.values())) return float(np.nanmean(fscore))
[docs]def fscore_per_class( truth: Sequence[object], prediction: Sequence[object], labels: Sequence[object] = None, *, zero_division: float = 0, ) -> dict[str, float]: r"""F-score per class. .. math:: \text{fscore}_k = \frac{\text{true positive}_k} {\text{true positive}_k + \frac{1}{2} (\text{false positive}_k + \text{false negative}_k)} Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. zero_division: set the value to return when there is a zero division Returns: dictionary with label as key and F-score as value Examples: >>> fscore_per_class([0, 0], [0, 1]) {0: 0.6666666666666666, 1: 0.0} """ if labels is None: labels = infer_labels(truth, prediction) precision = precision_per_class( truth, prediction, labels, zero_division=zero_division, ) recall = recall_per_class( truth, prediction, labels, zero_division=zero_division, ) fscore = {} for label, p, r in zip(labels, precision.values(), recall.values()): if p * r == 0: fscore[label] = 0.0 elif (p == 0.0 and np.isnan(r)) or (r == 0.0 and np.isnan(p)): fscore[label] = 0.0 else: fscore[label] = (2 * p * r) / (p + r) return fscore
[docs]def identification_error_rate( truth: pd.Series, prediction: pd.Series, *, num_workers: int = 1, multiprocessing: bool = False, ) -> float: r"""Identification error rate. .. math:: \text{IER} = \frac{\text{confusion}+\text{false alarm}+\text{miss}} {\text{total}} where :math:`\text{confusion}` is the total confusion duration, :math:`\text{false alarm}` is the total duration of predictions without an overlapping ground truth, :math:`\text{miss}` is the total duration of ground truth without an overlapping prediction, and :math:`\text{total}` is the total duration of ground truth segments. :footcite:`Bredin2017` The identification error rate should be used when the labels are known by the prediction model. If this isn't the case, consider using :func:`audmetric.diarization_error_rate`. .. footbibliography:: Args: truth: ground truth labels with a segmented index conform to `audformat`_ prediction: predicted labels with a segmented index conform to `audformat`_ num_workers: number of threads or 1 for sequential processing multiprocessing: use multiprocessing instead of multithreading Returns: identification error rate Raises: ValueError: if ``truth`` or ``prediction`` do not have a segmented index conform to `audformat`_ Examples: >>> import pandas as pd >>> import audformat >>> truth = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav"], ... starts=[0.0, 0.1], ... ends=[0.1, 0.2], ... ), ... data=["a", "b"], ... ) >>> prediction = pd.Series( ... index=audformat.segmented_index( ... files=["f1.wav", "f1.wav", "f1.wav"], ... starts=[0, 0.1, 0.1], ... ends=[0.1, 0.15, 0.2], ... ), ... data=["a", "b", "a"], ... ) >>> identification_error_rate(truth, prediction) 0.5 .. _audformat: https://audeering.github.io/audformat/data-format.html """ if not is_segmented_index(truth) or not is_segmented_index(prediction): raise ValueError( "The truth and prediction " "should be a pandas Series with a segmented index conform to audformat." ) files = ( truth.index.get_level_values(FILE) .unique() .union(prediction.index.get_level_values(FILE).unique()) ) if len(files) > 0: results = audeer.run_tasks( _file_ier, params=[ ( ( truth[truth.index.get_level_values(FILE) == file], prediction[prediction.index.get_level_values(FILE) == file], ), {}, ) for file in files ], num_workers=num_workers, multiprocessing=multiprocessing, ) total_confusion, total_false_alarm, total_misses, total_duration = [ sum(x) for x in zip(*results) ] else: total_confusion = total_false_alarm = total_misses = total_duration = 0.0 numerator = total_confusion + total_false_alarm + total_misses if total_duration == 0.0: ier = 0.0 if numerator == 0.0 else 1.0 else: ier = numerator / total_duration if ier > 1.0: # In this case it is possible that there is no overlap between files # So we warn the user if there are no common files _check_common_files(truth, prediction) return ier
[docs]def linkability( truth: (bool | int | Sequence[bool | int]), prediction: (bool | int | float | Sequence[bool | int | float]), omega: float = 1.0, nbins: int = None, ) -> float: r"""Linkability for verification tasks. Let :math:`s` be the provided prediction score for the similarity of the tested sample. The clipped local linkability metric is then defined as: .. math:: \text{max}(0, p(\text{mated} | s) - p(\text{non-mated} | s)) The higher the value, the more likely that an attacker can link two mated samples. The global linkability metric :math:`D_\text{sys}` is the mean value over all local scores,\ :footcite:`GomezBarrero2017` and in the range :math:`0` and :math:`1`. Implementation is based on `code from M. Maouche`_, which is licensed under LGPL. .. footbibliography:: .. _code from M. Maouche: https://gitlab.inria.fr/magnet/anonymization_metrics Args: truth: ground truth classes prediction: predicted classes or similarity scores omega: prior ratio :math:`\frac{p(\text{mated})}{p(\text{non-mated})}` nbins: number of bins of the histograms that estimate the distributions of mated and non-mated scores. If ``None`` it is set to :math:`\min(\frac{\text{len}(\text{mated})}{10}, 100)` Returns: global linkability :math:`D_\text{sys}` Raises: ValueError: if ``truth`` contains values different from ``1``, ``0``, ``True``, ``False`` Examples: >>> np.random.seed(1) >>> samples = 10000 >>> truth = [1, 0] * int(samples / 2) >>> prediction = [] >>> for _ in range(int(samples / 2)): ... prediction.extend( ... [np.random.uniform(0, 0.2), np.random.uniform(0.8, 1.0)] ... ) >>> linkability(truth, prediction) 0.9747999999999999 >>> truth = [1, 0, 0, 0] * int(samples / 4) >>> prediction = [np.random.uniform(0, 1) for _ in range(samples)] >>> linkability(truth, prediction, omega=1 / 3) 0.0 """ # noqa: E501 mated_scores, non_mated_scores = _matching_scores(truth, prediction) # Limiting the number of bins # (100 maximum or lower if few scores available) if nbins is None: nbins = min(int(len(mated_scores) / 10), 100) # Define range of scores to compute D bin_edges = np.linspace( min([min(mated_scores), min(non_mated_scores)]), max([max(mated_scores), max(non_mated_scores)]), num=nbins + 1, endpoint=True, ) bin_centers = (bin_edges[1:] + bin_edges[:-1]) / 2 # Compute score distributions using normalized histograms y1 = np.histogram(mated_scores, bins=bin_edges, density=True)[0] y2 = np.histogram(non_mated_scores, bins=bin_edges, density=True)[0] # LR = P[s|mated ]/P[s|non-mated] lr = np.divide(y1, y2, out=np.ones_like(y1), where=y2 != 0) d = 2 * (omega * lr / (1 + omega * lr)) - 1 # Def of D d[omega * lr <= 1] = 0 # Taking care of inf/NaN mask = [True if y2[i] == 0 and y1[i] != 0 else False for i in range(len(y1))] d[mask] = 1 # Global measure using trapz numerical integration d_sys = np.trapezoid(x=bin_centers, y=d * y1) return float(d_sys)
[docs]def mean_absolute_error( truth: Sequence[float], prediction: Sequence[float], ) -> float: r"""Mean absolute error. .. math:: \text{MAE} = \frac{1}{n} \sum^n_{i=1} |\text{prediction} - \text{truth}| Args: truth: ground truth values prediction: predicted values Returns: mean absolute error Raises: ValueError: if ``truth`` and ``prediction`` differ in length Examples: >>> mean_absolute_error([0, 0], [0, 1]) 0.5 """ assert_equal_length(truth, prediction) prediction = np.array(prediction) truth = np.array(truth) return float(np.abs(truth - prediction).mean(axis=0))
[docs]def mean_squared_error( truth: Sequence[float], prediction: Sequence[float], ) -> float: r"""Mean squared error. .. math:: \text{MSE} = \frac{1}{n} \sum^n_{i=1} (\text{prediction} - \text{truth})^2 Args: truth: ground truth values prediction: predicted values Returns: mean squared error Raises: ValueError: if ``truth`` and ``prediction`` differ in length Examples: >>> mean_squared_error([0, 0], [0, 1]) 0.5 """ assert_equal_length(truth, prediction) prediction = np.array(prediction) truth = np.array(truth) return float(np.square(truth - prediction).mean(axis=0))
[docs]def pearson_cc( truth: Sequence[float], prediction: Sequence[float], ) -> float: r"""Pearson correlation coefficient. .. math:: \rho = \frac{\text{cov}(\text{prediction}, \text{truth})}{ \sigma_\text{prediction}\sigma_\text{truth}} where :math:`\sigma` is the standard deviation, and :math:`\text{cov}` is the covariance. Args: truth: ground truth values prediction: predicted values Returns: pearson correlation coefficient :math:`\in [-1, 1]` Raises: ValueError: if ``truth`` and ``prediction`` differ in length Examples: >>> pearson_cc([0, 1, 2], [0, 1, 1]) 0.8660254037844385 """ assert_equal_length(truth, prediction) if not isinstance(truth, np.ndarray): truth = np.array(list(truth)) if not isinstance(prediction, np.ndarray): prediction = np.array(list(prediction)) if len(prediction) < 2 or prediction.std() == 0: return np.nan else: return float(np.corrcoef(prediction, truth)[0][1])
[docs]def precision_per_class( truth: Sequence[object], prediction: Sequence[object], labels: Sequence[object] = None, *, zero_division: float = 0, ) -> dict[str, float]: r"""Precision per class. .. math:: \text{precision}_k = \frac{\text{true positive}_k} {\text{true positive}_k + \text{false positive}_k} Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. zero_division: set the value to return when there is a zero division Returns: dictionary with label as key and precision as value Examples: >>> precision_per_class([0, 0], [0, 1]) {0: 1.0, 1: 0.0} """ if labels is None: labels = infer_labels(truth, prediction) matrix = np.array(confusion_matrix(truth, prediction, labels)) total = matrix.sum(axis=0) old_settings = np.seterr(invalid="ignore") precision = matrix.diagonal() / total np.seterr(**old_settings) precision[np.isnan(precision)] = zero_division return {label: float(r) for label, r in zip(labels, precision)}
[docs]def recall_per_class( truth: Sequence[object], prediction: Sequence[object], labels: Sequence[object] = None, *, zero_division: float = 0, ) -> dict[str, float]: r"""Recall per class. .. math:: \text{recall}_k = \frac{\text{true positive}_k} {\text{true positive}_k + \text{false negative}_k} Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. zero_division: set the value to return when there is a zero division Returns: dictionary with label as key and recall as value Examples: >>> recall_per_class([0, 0], [0, 1]) {0: 0.5, 1: 0.0} """ if labels is None: labels = infer_labels(truth, prediction) matrix = np.array(confusion_matrix(truth, prediction, labels)) total = matrix.sum(axis=1) old_settings = np.seterr(invalid="ignore") recall = matrix.diagonal() / total np.seterr(**old_settings) recall[np.isnan(recall)] = zero_division return {label: float(r) for label, r in zip(labels, recall)}
[docs]def unweighted_average_bias( truth: Sequence[object], prediction: Sequence[object], protected_variable: Sequence[object], labels: Sequence[object] = None, *, subgroups: Sequence[object] = None, metric: Callable[ [ Sequence[object], Sequence[object], Sequence[str] | None, ], dict[str, float], ] = fscore_per_class, reduction: Callable[ [ Sequence[float], ], float, ] = np.std, ) -> float: r"""Unweighted average bias of protected variable. The bias is measured in terms of *equalized odds* which requires the classifier to have identical performance for all classes independent of a protected variable such as race. The performance of the classifier for its different classes can be assessed with standard metrics such as *recall* or *precision*. The difference in performance, denoted as score divergence, can be computed in different ways, as well. For two subgroups the (absolute) difference serves as a standard choice. For more than two subgroups the score divergence could be estimated by the standard deviation of the scores. Note: If for a class less than two subgroups exhibit a performance score, the corresponding class is ignored in the bias computation. This occurs if there is no class sample for a subgroup, e.g. no negative (class label) female (subgroup of sex). Args: truth: ground truth classes prediction: predicted classes protected_variable: manifestations of protected variable such as subgroups "male" and "female" of variable "sex" labels: included labels in preferred ordering. The bias is computed only on the specified labels. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. subgroups: included subgroups in preferred ordering. The direction of the bias is determined by the ordering of the subgroups. Besides, the bias is computed only on the specified subgroups. If no subgroups are supplied, they will be inferred from :math:`\text{protected\_variable}` and ordered alphanumerically. metric: metric which equalized odds are measured with. Typical choices are: :func:`audmetric.recall_per_class`, :func:`audmetric.precision_per_class` or :func:`audmetric.fscore_per_class` reduction: specifies the reduction operation to measure the divergence between the scores of the subgroups of the protected variable for each class. Typical choices are: difference or absolute difference between scores for two subgroups and standard deviation of scores for more than two subgroups. Returns: unweighted average bias Raises: ValueError: if ``truth``, ``prediction`` and ``protected_variable`` have different lengths ValueError: if ``subgroups`` contains values not contained in ``protected_variable`` Examples: >>> unweighted_average_bias([1, 1], [1, 0], ["male", "female"]) 0.5 >>> unweighted_average_bias( ... [1, 1], ... [1, 0], ... ["male", "female"], ... subgroups=["female", "male"], ... reduction=lambda x: x[0] - x[1], ... ) -1.0 >>> unweighted_average_bias( ... [0, 1], [1, 0], ["male", "female"], metric=recall_per_class ... ) nan >>> unweighted_average_bias( ... [0, 0, 0, 0], ... [1, 1, 0, 0], ... ["a", "b", "c", "d"], ... metric=recall_per_class, ... ) 0.5 """ # noqa: E501 if labels is None: labels = infer_labels(truth, prediction) if not len(truth) == len(prediction) == len(protected_variable): raise ValueError( f"'truth', 'prediction' and 'protected_variable' should have " f"same lengths, but received '{len(truth)}', '{len(prediction)}' " f"and '{len(protected_variable)}'" ) if subgroups is None: subgroups = sorted(set(protected_variable)) scores = scores_per_subgroup_and_class( truth=truth, prediction=prediction, protected_variable=protected_variable, labels=labels, subgroups=subgroups, metric=metric, zero_division=np.nan, ) bias = 0.0 denominator = 0 for label in labels: scores_subgroup = [ scores[subgroup][label] for subgroup in subgroups if label in scores[subgroup] and not np.isnan(scores[subgroup][label]) ] # compute score divergence only where more than 1 score per class if len(scores_subgroup) > 1: bias += reduction(scores_subgroup) denominator += 1 if denominator == 0: return np.nan return float(bias / denominator)
[docs]def unweighted_average_fscore( truth: Sequence[object], prediction: Sequence[object], labels: Sequence[object] = None, *, zero_division: float = 0, ) -> float: r"""Unweighted average F-score. .. math:: \text{UAF} = \frac{1}{K} \sum^K_{k=1} \frac{\text{true positive}_k} {\text{true positive}_k + \frac{1}{2} (\text{false positive}_k + \text{false negative}_k)} Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. zero_division: set the value to return when there is a zero division Returns: unweighted average f-score Examples: >>> unweighted_average_fscore([0, 0], [0, 1]) 0.3333333333333333 """ fscore = fscore_per_class( truth, prediction, labels, zero_division=zero_division, ) fscore = np.array(list(fscore.values())) return float(fscore.mean())
[docs]def unweighted_average_precision( truth: Sequence[object], prediction: Sequence[object], labels: Sequence[object] = None, *, zero_division: float = 0, ) -> float: r"""Unweighted average precision. .. math:: \text{UAP} = \frac{1}{K} \sum^K_{k=1} \frac{\text{true positive}_k} {\text{true positive}_k + \text{false positive}_k} Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. zero_division: set the value to return when there is a zero division Returns: unweighted average precision Examples: >>> unweighted_average_precision([0, 0], [0, 1]) 0.5 """ precision = precision_per_class( truth, prediction, labels, zero_division=zero_division, ) precision = np.array(list(precision.values())) return float(precision.mean())
[docs]def unweighted_average_recall( truth: Sequence[object], prediction: Sequence[object], labels: Sequence[object] = None, *, zero_division: float = 0, ) -> float: r"""Unweighted average recall. .. math:: \text{UAR} = \frac{1}{K} \sum^K_{k=1} \frac{\text{true positive}_k} {\text{true positive}_k + \text{false negative}_k} Args: truth: ground truth values/classes prediction: predicted values/classes labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. zero_division: set the value to return when there is a zero division Returns: unweighted average recall Examples: >>> unweighted_average_recall([0, 0], [0, 1]) 0.25 """ recall = recall_per_class( truth, prediction, labels, zero_division=zero_division, ) recall = np.array(list(recall.values())) return float(recall.mean())
[docs]def weighted_confusion_error( truth: Sequence[object], prediction: Sequence[object], weights: Sequence[Sequence[int | float]], labels: Sequence[object] = None, ) -> float: r"""Weighted confusion error. Computes the normalized confusion matrix, applies given weights to each cell and sums them up. Weights are expected as positive numbers and will be normalized by the sum of all weights. The higher the weight, the more costly will be the error. A weight of 0 means that the cell is not taken into account for the error, this is usually the case for the diagonal as it holds correctly classified samples. Args: truth: ground truth values/classes prediction: predicted values/classes weights: weights applied to the confusion matrix. Expected as a list of lists in the following form (r=row, c=column): ``[[w_r0_c0, ..., w_r0_cN], ..., [w_rN_c0, ..., w_rN_cN]]`` labels: included labels in preferred ordering. If no labels are supplied, they will be inferred from :math:`\{\text{prediction}, \text{truth}\}` and ordered alphabetically. Returns: weighted confusion error Examples: >>> truth = [0, 1, 2] >>> prediction = [0, 2, 0] >>> # penalize only errors > 1 >>> weights = [[0, 0, 1], [0, 0, 0], [1, 0, 0]] >>> weighted_confusion_error(truth, prediction, weights) 0.5 """ weights = weights / np.sum(weights) cm = confusion_matrix(truth, prediction, labels, normalize=True) cm = np.array(cm) if not cm.shape == weights.shape: raise ValueError( "Shape of weights " f"{weights.shape} " "does not match shape of confusion matrix " f"{cm.shape}." ) weighted_cm = cm * weights return float(np.sum(weighted_cm))
[docs]def word_error_rate( truth: Sequence[Sequence[str]], prediction: Sequence[Sequence[str]], *, norm: str = "truth", ) -> float: r"""Word error rate based on edit distance. The word error rate is computed by aggregating the normalized edit distances of each (truth, prediction)-pair and averaging the aggregated score by the number of pairs. The normalized edit distance of each (truth, prediction)-pair is computed as the edit distance divided by a normalization factor n. This represents the average editing cost per sequence item. The value of n depends on the ``norm`` parameter. If ``norm`` is ``"truth"``, n is set to the reference (truth) length, following the Wikipedia formulation. Here, n is the number of words in the reference. This means WER can be greater than 1 if the prediction sequence is longer than the reference: .. math:: n = \text{len}(t) If ``norm`` is ``"longest"``, n is set to the maximum length between truth and prediction: .. math:: n = \max(\text{len}(t), \text{len}(p)) Args: truth: ground truth strings prediction: predicted strings norm: normalization method, either "truth" or "longest". "truth" normalizes by truth length, "longest" normalizes by max length of truth and prediction Returns: word error rate Raises: ValueError: if ``truth`` and ``prediction`` differ in length ValueError: if ``norm`` is not one of ``"truth"``, ``"longest"`` Examples: >>> truth = [["lorem", "ipsum"], ["north", "wind", "and", "sun"]] >>> prediction = [["lorm", "ipsum"], ["north", "wind"]] >>> word_error_rate(truth, prediction) 0.5 >>> truth = [["hello", "world"]] >>> prediction = [["xyz", "moon", "abc"]] >>> word_error_rate(truth, prediction) 1.5 >>> word_error_rate(truth, prediction, norm="longest") 1.0 """ assert_equal_length(truth, prediction) if norm not in ["truth", "longest"]: raise ValueError(f"'norm' must be one of 'truth', 'longest', got '{norm}'") wer = 0.0 for t, p in zip(truth, prediction): # map words to ints unique_words = set(t).union(set(p)) map = {k: v for k, v in zip(unique_words, range(len(unique_words)))} t = [map[i] for i in t] p = [map[i] for i in p] if norm == "longest": n = max(len(t), len(p)) else: n = len(t) n = n if n > 1 else 1 wer += edit_distance(t, p) / n num_samples = len(truth) if len(truth) > 1 else 1 return float(wer / num_samples)
def _event_metric_per_class( truth: pd.Series, prediction: pd.Series, labels: Sequence[object] | None, zero_division: float, onset_tolerance: float | None, offset_tolerance: float | None, duration_tolerance: float | None, axis: int, # 0=precision, 1=recall ) -> dict[str, float]: if labels is None: labels = infer_labels(truth, prediction) cm = np.array( event_confusion_matrix( truth, prediction, labels, onset_tolerance=onset_tolerance, offset_tolerance=offset_tolerance, duration_tolerance=duration_tolerance, ) ) totals = cm.sum(axis=axis) vals = cm.diagonal() / totals vals = np.nan_to_num(vals, nan=zero_division) # The event based confusion matrix also has a row/column # for the "no event" class (aka the absence of a segment) # but we only return the recall/precision per class return {lab: float(vals[i]) for i, lab in enumerate(labels)} def _matching_scores( truth: (bool | int | Sequence[bool | int]), prediction: (bool | int | float | Sequence[bool | int | float]), ) -> tuple[np.ndarray, np.ndarray]: r"""Mated and non-mated scores for verification tasks. For verification task, predictions are usually separated in all predictions belonging to the matching examples, and all other predictions. The first are called mated scores or genuine matching scores, the second non-mated scores or impostor matching scores. For example, in a speaker verification task the mated scores are all similarity values that belong to the matching speaker. Args: truth: ground truth classes prediction: predicted classes or similarity scores Returns: * mated scores * non-mated scores Raises: ValueError: if ``truth`` contains values different from ``1, 0, True, False`` Examples: >>> truth = [1, 0] >>> prediction = [0.9, 0.1] >>> _matching_scores(truth, prediction) (array([0.9]), array([0.1])) """ truth = np.array(truth) allowed_truth_values = {1, 0, True, False} if not set(truth).issubset(allowed_truth_values): raise ValueError( "'truth' is only allowed to contain " "[1, 0, True, False], " "yours contains:\n" f"[{', '.join([str(t) for t in set(truth)])}]" ) truth = truth.astype(bool) prediction = np.array(prediction).astype(np.float64) # Predictions for all matching examples # (truth is 1 or True) # In literature these are called # "genuine matching scores" # or "mated scores" mated_scores = prediction[truth] # Predictions for all non-matching examples # (truth is 0 or False) # In literature these are called # "impostor matching scores" # or "non-mated scores" non_mated_scores = prediction[~truth] return mated_scores, non_mated_scores def _check_common_files(truth: pd.Series, prediction: pd.Series): r"""Warn the user if there are no common files between truth and prediction.""" if ( len(truth) > 0 and len( truth.index.get_level_values(FILE) .unique() .intersection(prediction.index.get_level_values(FILE).unique()) ) == 0 ): warnings.warn( message="There are no common files shared between truth and prediction.", category=UserWarning, stacklevel=2, ) def _cooccurrence( truth: pd.Series, prediction: pd.Series, truth_label2index: dict[str, int], prediction_label2index: dict[str, int], num_workers: int = 1, multiprocessing: bool = False, ) -> np.ndarray: r"""Get the cooccurance duration of the labels given in truth and prediction.""" files = truth.index.get_level_values(FILE).unique() if len(files) > 0: results = audeer.run_tasks( _file_cooccurrence, params=[ ( ( truth[truth.index.get_level_values(FILE) == file], prediction[prediction.index.get_level_values(FILE) == file], truth_label2index, prediction_label2index, ), {}, ) for file in files ], num_workers=num_workers, multiprocessing=multiprocessing, ) return sum(results) else: return np.zeros((len(truth_label2index), len(prediction_label2index))) def _diarization_mapper( truth: pd.Series, prediction: pd.Series, individual_file_mapping: bool = False, num_workers: int = 1, multiprocessing: bool = False, ) -> dict[object, object]: r"""Return a mapping from prediction label to truth label based on cooccurrence. Based on code at https://github.com/pyannote/pyannote-metrics/blob/d785d78fcbce89c890a957b7f90f17ac41b0fc21/src/pyannote/metrics/matcher.py#L172 """ truth_labels = sorted(truth.unique()) n_truth = len(truth) prediction_labels = sorted(prediction.unique()) n_pred = len(prediction) truth_label2index = {label: i for i, label in enumerate(truth_labels)} prediction_label2index = {label: i for i, label in enumerate(prediction_labels)} if individual_file_mapping: # In case mappings should be done for each file individually, # call this function for each file separately. # Then merge the resulting dictionaries. files = truth.index.get_level_values(FILE).unique() file_mappings = audeer.run_tasks( _diarization_mapper, params=[ ( ( truth[truth.index.get_level_values(FILE) == file], prediction[prediction.index.get_level_values(FILE) == file], ), {"individual_file_mapping": False}, ) for file in files ], num_workers=num_workers, multiprocessing=multiprocessing, ) # Labels don't overlap between files, # so we don't have overlapping keys in the resulting mappings # and we can combine the result by updating the dictionary mapping = {} for file_mapping in file_mappings: mapping.update(file_mapping) else: cooccurrence = _cooccurrence( truth, prediction, truth_label2index, prediction_label2index, num_workers=num_workers, multiprocessing=multiprocessing, ) mapping = {} for _ in range(min(n_truth, n_pred)): # Indices of the maximal elements of coocurrence i_truth, i_pred = np.unravel_index( np.argmax(cooccurrence), cooccurrence.shape ) if cooccurrence[i_truth, i_pred] > 0: mapping[prediction_labels[i_pred]] = truth_labels[i_truth] # Since these two labels have been matched, # we set their entries in the coocurrence matrix to zero cooccurrence[i_truth, :] = 0.0 cooccurrence[:, i_pred] = 0.0 continue break return mapping def _file_cooccurrence( file_truth: pd.Series, file_prediction: pd.Series, truth_label2index: dict[str, int], prediction_label2index: dict[str, int], ) -> np.ndarray: r"""Cooccurance duration of the labels in truth and prediction for one file.""" matrix = np.zeros((len(truth_label2index), len(prediction_label2index))) for (_, start, end), label in file_truth.items(): intersecting = _intersecting_segments(start, end, file_prediction) for (_, other_start, other_end), other_label in intersecting.items(): shared_duration = _overlap_duration(start, end, other_start, other_end) matrix[truth_label2index[label], prediction_label2index[other_label]] += ( shared_duration ) return matrix def _file_ier( file_truth: pd.Series, file_prediction: pd.Series ) -> tuple[float, float, float, float]: r"""Compute IER relevant components for one file. Args: file_truth: true segments of one file file_prediction: predicted segments of one file Returns: tuple of confusion, false alarm, misses, total duration """ file_confusion = 0 file_false_alarm = 0 file_misses = 0 file_duration = 0 # Get subsegments formed by truth and prediction segment boundaries # Example: # truth |------| |------| # |ooo| # prediction |--------| |-----| # Result: # starts/ends | | | | | | | | boundaries = ( _segment_boundaries(file_truth) .union(_segment_boundaries(file_prediction)) .unique() ) boundaries = boundaries.sort_values() starts = boundaries[:-1] ends = boundaries[1:] # List of (unique) labels that occur in this window in the truth truth_label_lists = _subsegment_labels(file_truth, starts, ends) # List of (unique) labels that occur in this window in the prediction prediction_label_lists = _subsegment_labels(file_prediction, starts, ends) for start, end, truth_labels, prediction_labels in zip( starts, ends, truth_label_lists, prediction_label_lists ): if len(truth_labels) == 0 and len(prediction_labels) == 0: continue duration = (end - start).total_seconds() # Overlap between truth and predicted labels in this window correct_labels = [lab for lab in truth_labels if lab in prediction_labels] # Unmatched truth labels in this window extra_truth_labels = [lab for lab in truth_labels if lab not in correct_labels] # Unmatched predicted labels in this window extra_pred_labels = [ lab for lab in prediction_labels if lab not in correct_labels ] n_confusion = min(len(extra_truth_labels), len(extra_pred_labels)) n_false_alarm = 0 n_misses = 0 # More unmatched prediction labels than truth labels -> add to false alarms if len(extra_pred_labels) > len(extra_truth_labels): n_false_alarm = len(extra_pred_labels) - len(extra_truth_labels) # More unmatched truth labels than prediction labels -> add to misses elif len(extra_truth_labels) > len(extra_pred_labels): n_misses = len(extra_truth_labels) - len(extra_pred_labels) file_confusion += duration * n_confusion file_false_alarm += duration * n_false_alarm file_misses += duration * n_misses file_duration += duration * len(truth_labels) return file_confusion, file_false_alarm, file_misses, file_duration def _intersecting_segments( start: pd.Timedelta, end: pd.Timedelta, other_segments: pd.Series ) -> pd.Series: r"""Return sorted segments that intersect with the given start and end.""" other_segments = other_segments.sort_index() return other_segments[ ( (start < other_segments.index.get_level_values(START)) & (other_segments.index.get_level_values(START) < end) ) | ( (start > other_segments.index.get_level_values(START)) & (start < other_segments.index.get_level_values(END)) ) | (start == other_segments.index.get_level_values(START)) ] def _overlap_duration( start: pd.Timedelta, end: pd.Timedelta, other_start: pd.Timedelta, other_end: pd.Timedelta, ) -> float: r"""Duration of overlap between two time windows in seconds.""" return max(0.0, (min(end, other_end) - max(start, other_start)).total_seconds()) def _segment_boundaries(segments: pd.Series) -> pd.Index: r"""Get the unique segment boundaries present in the given segments.""" starts = segments.index.get_level_values(START) ends = segments.index.get_level_values(END) boundaries = starts.union(ends).unique() boundaries = boundaries.sort_values() return boundaries def _subsegment_labels( segments: pd.Series, starts: pd.Index, ends: pd.Index ) -> list[list[object]]: r"""Return label lists that occur at each subsegment. The result is a list whose elements correspond to the subsegments given by the ``starts`` and ``ends`` times. Each element contains the list of labels that occur in ``segments`` during that subsegment. """ subsegment_labels = [] for start, end in zip(starts, ends): intersection = _intersecting_segments(start, end, segments) labels = sorted(intersection.unique()) subsegment_labels.append(labels) return subsegment_labels def _segments_overlap( start_t, end_t, start_p, end_p, onset_tol, offset_tol, duration_tol ) -> bool: if onset_tol is not None: if math.fabs(start_t - start_p) > onset_tol: return False # Compute the effective offset tolerance eff_off = offset_tol or 0.0 if duration_tol is not None: eff_off = max(eff_off, duration_tol * (end_t - start_t)) if math.fabs(end_t - end_p) > eff_off: return False return True