Source code for audinterface.core.segment_with_feature

from __future__ import annotations

from collections.abc import Callable
from collections.abc import Sequence
import errno
import os

import numpy as np
import pandas as pd

import audeer
import audformat

from audinterface.core.typing import Timestamp
from audinterface.core.typing import Timestamps
import audinterface.core.utils as utils


def empty_series(signal, sampling_rate, **kwargs) -> pd.Series:
    r"""Default segment with feature process function.

    This function is used,
    when ``SegmentWithFeature`` is instantiated
    with ``process_func=None``.
    It returns a series with an empty multi-index,
    with levels ``start`` and ``end``.

    Args:
        signal: signal
        sampling_rate: sampling rate in Hz
        **kwargs: additional keyword arguments of the processing function

    Returns:
        empty series

    """
    index = utils.signal_index()
    return pd.Series(index=index)


[docs]class SegmentWithFeature: r"""Segmentation with feature interface. Interface for functions that apply a segmentation to the input signal, and also compute features for those segments at the same time, e.g. a speech recognition model that recognizes speech and also provides the time stamps of that speech. The features are returned as a :class:`pandas.DataFrame` with ``num_features`` columns and one row per detected segment. Args: feature_names: features are stored as columns in a data frame, where ``feature_names`` defines the names of the columns. name: name of the feature set, e.g. ``'stft'`` params: parameters that describe the feature set, e.g. ``{'win_size': 512, 'hop_size': 256, 'num_fft': 512}``. With the parameters you can differentiate different flavors of the same feature set process_func: segmentation with feature function, which expects the two positional arguments ``signal`` and ``sampling_rate`` and any number of additional keyword arguments (see ``process_func_args``). There are the following special arguments: ``'idx'``, ``'file'``, ``'root'``. If expected by the function, but not specified in ``process_func_args``, they will be replaced with: a running index, the currently processed file, the root folder. Must return a :class:`pandas.Series` with a :class:`pandas.MultiIndex` with two levels named `start` and `end` that hold start and end positions as :class:`pandas.Timedelta` objects, and with elements in the shape of ``(num_features)``. process_func_args: (keyword) arguments passed on to the processing function sampling_rate: sampling rate in Hz. If ``None`` it will call ``process_func`` with the actual sampling rate of the signal resample: if ``True`` enforces given sampling rate by resampling channels: channel selection, see :func:`audresample.remix` mixdown: apply mono mix-down on selection min_signal_dur: minimum signal length required by ``process_func``. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options. If provided signal is shorter, it will be zero padded at the end max_signal_dur: maximum signal length required by ``process_func``. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options. If provided signal is longer, it will be cut at the end keep_nat: if the end of segment is set to ``NaT`` do not replace with file duration in the result num_workers: number of parallel jobs or 1 for sequential processing. If ``None`` will be set to the number of processors on the machine multiplied by 5 in case of multithreading and number of processors in case of multiprocessing multiprocessing: use multiprocessing instead of multithreading verbose: show debug messages Raises: ValueError: if ``resample = True``, but ``sampling_rate = None`` Examples: >>> def segment_with_mean_std(signal, sampling_rate, *, win_size=1.0, hop_size=1.0): ... size = signal.shape[1] / sampling_rate ... starts = pd.to_timedelta( ... np.arange(0, size - win_size + (1 / sampling_rate), hop_size), ... unit="s", ... ) ... ends = pd.to_timedelta( ... np.arange(win_size, size + (1 / sampling_rate), hop_size), unit="s" ... ) ... # Get windows of shape (channels, samples, frames) ... frames = utils.sliding_window(signal, sampling_rate, win_size, hop_size) ... means = frames.mean(axis=(0, 1)) ... stds = frames.std(axis=(0, 1)) ... index = pd.MultiIndex.from_tuples(zip(starts, ends), names=["start", "end"]) ... features = list(np.stack((means, stds), axis=-1)) ... return pd.Series(data=features, index=index) >>> interface = SegmentWithFeature( ... feature_names=["mean", "std"], process_func=segment_with_mean_std ... ) >>> signal = np.array([1.0, 2.0, 3.0, 4.0, 5.0, 6.0]) >>> interface(signal, sampling_rate=2) start end 0 days 00:00:00 0 days 00:00:01 [1.5, 0.5] 0 days 00:00:01 0 days 00:00:02 [3.5, 0.5] 0 days 00:00:02 0 days 00:00:03 [5.5, 0.5] dtype: object >>> interface.process_signal(signal, sampling_rate=2) mean std start end 0 days 00:00:00 0 days 00:00:01 1.5 0.5 0 days 00:00:01 0 days 00:00:02 3.5 0.5 0 days 00:00:02 0 days 00:00:03 5.5 0.5 >>> # Apply interface on an audformat conform index of a dataframe >>> import audb >>> db = audb.load( ... "emodb", ... version="1.3.0", ... media="wav/03a01Fa.wav", ... full_path=False, ... verbose=False, ... ) >>> index = db["emotion"].index >>> interface.process_index(index, root=db.root) mean std file start end wav/03a01Fa.wav 0 days 0 days 00:00:01 -0.000329 0.098115 """ # noqa: E501 def __init__( self, feature_names: str | Sequence[str], *, name: str | None = None, params: dict[str, object] | None = None, process_func: Callable[..., pd.Series] | None = None, process_func_args: dict[str, object] | None = None, sampling_rate: int | None = None, resample: bool = False, channels: int | Sequence[int] = 0, mixdown: bool = False, min_signal_dur: Timestamp | None = None, max_signal_dur: Timestamp | None = None, keep_nat: bool = False, num_workers: int | None = 1, multiprocessing: bool = False, verbose: bool = False, ): # avoid cycling imports from audinterface.core.process import Process feature_names = audeer.to_list(feature_names) process_func_args = process_func_args or {} if process_func is None: process_func_args["feature_names"] = feature_names process_func = empty_series process = Process( process_func=process_func, process_func_args=process_func_args, sampling_rate=sampling_rate, resample=resample, channels=channels, mixdown=mixdown, min_signal_dur=min_signal_dur, max_signal_dur=max_signal_dur, keep_nat=keep_nat, num_workers=num_workers, multiprocessing=multiprocessing, verbose=verbose, ) self.feature_names = feature_names r"""Feature names.""" self.name = name r"""Name of the feature set.""" self.num_features = len(feature_names) r"""Number of features.""" self.params = params r"""Dictionary of parameters describing the feature set.""" self.process = process r"""Processing object."""
[docs] def process_file( self, file: str, *, start: Timestamp | None = None, end: Timestamp | None = None, root: str | None = None, process_func_args: dict[str, object] | None = None, ) -> pd.DataFrame: r"""Segment the content of an audio file and extract features. Args: file: file path start: start processing at this position. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options end: end processing at this position. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options root: root folder to expand relative file path process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.SegmentWithFeature.process.process_func_args` Returns: :class:`pandas.DataFrame` with segmented index conform to audformat_ Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid ValueError: if the process function doesn't return a :class:`pd.Series` with index conform to audformat_ and elements of shape ``(num_features)`` .. _audformat: https://audeering.github.io/audformat/data-format.html """ if start is None or pd.isna(start): start = pd.to_timedelta(0) y = self.process.process_file( file, start=start, end=end, root=root, process_func_args=process_func_args, ) return self._construct_frame_segmented_index(y)
[docs] def process_files( self, files: Sequence[str], *, starts: Timestamps | None = None, ends: Timestamps | None = None, root: str | None = None, process_func_args: dict[str, object] | None = None, ) -> pd.DataFrame: r"""Segment and extract features for a list of files. Args: files: list of file paths starts: segment start positions. Time values given as float or integers are treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options. If a scalar is given, it is applied to all files ends: segment end positions. Time values given as float or integers are treated as seconds See :func:`audinterface.utils.to_timedelta` for further options. If a scalar is given, it is applied to all files root: root folder to expand relative file paths process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.SegmentWithFeature.process.process_func_args` Returns: :class:`pandas.DataFrame` with segmented index conform to audformat_ Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid ValueError: if the process function doesn't return a :class:`pd.Series` with index conform to audformat_ and elements of shape ``(num_features)`` .. _audformat: https://audeering.github.io/audformat/data-format.html """ y = self.process.process_files( files, starts=starts, ends=ends, root=root, process_func_args=process_func_args, ) return self._construct_frame_segmented_index(y)
[docs] def process_folder( self, root: str, *, filetype: str = "wav", include_root: bool = True, process_func_args: dict[str, object] | None = None, ) -> pd.DataFrame: r"""Segment and extract features for files in a folder. .. note:: At the moment does not scan in sub-folders! Args: root: root folder filetype: file extension include_root: if ``True`` the file paths are absolute in the index of the returned result process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.SegmentWithFeature.process.process_func_args` Returns: :class:`pandas.DataFrame` with segmented index conform to audformat_ Raises: FileNotFoundError: if folder does not exist RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid ValueError: if the process function doesn't return a :class:`pd.Series` with index conform to audformat_ and elements of shape ``(num_features)`` .. _audformat: https://audeering.github.io/audformat/data-format.html """ root = audeer.path(root) if not os.path.exists(root): raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), root, ) files = audeer.list_file_names( root, filetype=filetype, basenames=not include_root, ) return self.process_files( files, root=root, process_func_args=process_func_args, )
[docs] def process_index( self, index: pd.Index, *, root: str | None = None, cache_root: str | None = None, process_func_args: dict[str, object] | None = None, ) -> pd.DataFrame: r"""Segment and extract features for files or segments from an index. If ``cache_root`` is not ``None``, a hash value is created from the index using :func:`audformat.utils.hash` and the result is stored as ``<cache_root>/<hash>.pkl``. When called again with the same index, results will be read from the cached file. Args: index: index conform to audformat_ root: root folder to expand relative file paths cache_root: cache folder (see description) process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.SegmentWithFeature.process.process_func_args` Returns: :class:`pandas.DataFrame` with segmented index conform to audformat_ Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid ValueError: if the process function doesn't return a :class:`pd.Series` with index conform to audformat_ and elements of shape ``(num_features)`` .. _audformat: https://audeering.github.io/audformat/data-format.html """ index = audformat.utils.to_segmented_index(index) utils.assert_index(index) y = self.process.process_index( index, preserve_index=False, root=root, cache_root=cache_root, process_func_args=process_func_args, ) return self._construct_frame_segmented_index(y)
[docs] def process_signal( self, signal: np.ndarray, sampling_rate: int, *, file: str | None = None, start: Timestamp | None = None, end: Timestamp | None = None, process_func_args: dict[str, object] | None = None, ) -> pd.DataFrame: r"""Segment and extract features for audio signal. .. note:: If a ``file`` is given, the index of the returned frame has levels ``file``, ``start`` and ``end``. Otherwise, it consists only of ``start`` and ``end``. Args: signal: signal values sampling_rate: sampling rate in Hz file: file path start: start processing at this position. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options end: end processing at this position. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.SegmentWithFeature.process.process_func_args` Returns: :class:`pandas.DataFrame` with segmented index conform to audformat_ Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid ValueError: if the process function doesn't return a :class:`pd.Series` with index conform to audformat_ and elements of shape ``(num_features)`` .. _audformat: https://audeering.github.io/audformat/data-format.html """ y = self.process.process_signal( signal, sampling_rate, file=file, start=start, end=end, process_func_args=process_func_args, ) if file is None: return self._construct_frame_signal_index(y) else: return self._construct_frame_segmented_index(y)
[docs] def process_signal_from_index( self, signal: np.ndarray, sampling_rate: int, index: pd.Index, process_func_args: dict[str, object] | None = None, ) -> pd.DataFrame: r"""Segment and extract features for parts of a signal. Args: signal: signal values sampling_rate: sampling rate in Hz index: a segmented index conform to audformat_ or a :class:`pandas.MultiIndex` with two levels named `start` and `end` that hold start and end positions as :class:`pandas.Timedelta` objects. See also :func:`audinterface.utils.signal_index` process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.SegmentWithFeature.process.process_func_args` Returns: :class:`pandas.DataFrame` with segmented index conform to audformat_ Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid ValueError: if index contains duplicates ValueError: if the process function doesn't return a :class:`pd.Series` with index conform to audformat_ and elements of shape ``(num_features)`` .. _audformat: https://audeering.github.io/audformat/data-format.html """ utils.assert_index(index) if isinstance(index, pd.MultiIndex) and len(index.levels) == 2: has_file_level = False params = [ ( (signal, sampling_rate), {"start": start, "end": end}, ) for start, end in index ] else: has_file_level = True index = audformat.utils.to_segmented_index(index) params = [ ( (signal, sampling_rate), { "file": file, "start": start, "end": end, "process_func_args": process_func_args, }, ) for file, start, end in index ] y = audeer.run_tasks( self.process_signal, params, num_workers=self.process.num_workers, multiprocessing=self.process.multiprocessing, progress_bar=self.process.verbose, task_description=f"Process {len(index)} segments", maximum_refresh_time=1, ) files = [] starts = [] ends = [] features = {col: [] for col in self.feature_names} for df in y: if df is None: continue if has_file_level: files.extend(df.index.get_level_values("file")) starts.extend(df.index.get_level_values("start")) ends.extend(df.index.get_level_values("end")) for col in self.feature_names: features[col].extend(df[col]) if has_file_level: index = audformat.segmented_index(files, starts, ends) else: index = utils.signal_index(starts, ends) if len(index) == 0: # Pass no data to ensure consistent dtype for columns return pd.DataFrame(index=index, columns=self.feature_names) return pd.DataFrame(index=index, data=features, columns=self.feature_names)
[docs] def process_table( self, table: pd.Series | pd.DataFrame, *, root: str | None = None, cache_root: str | None = None, process_func_args: dict[str, object] | None = None, ) -> pd.DataFrame: r"""Segment and extract features for files or segments from a table. The labels of the table are reassigned to the new segments. The columns of the table may not overlap with the :attr:`audinterface.SegmentWithFeature.feature_names`. If ``cache_root`` is not ``None``, a hash value is created from the index using :func:`audformat.utils.hash` and the result is stored as ``<cache_root>/<hash>.pkl``. When called again with the same index, results will be read from the cached file. Args: table: :class:`pandas.Series` or :class:`pandas.DataFrame` with an index conform to audformat_ root: root folder to expand relative file paths cache_root: cache folder (see description) process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.SegmentWithFeature.process.process_func_args` Returns: :class:`pandas.DataFrame` with segmented index conform to audformat_ Raises: ValueError: if table is not a :class:`pandas.Series` or a :class:`pandas.DataFrame` ValueError: if the table columns and the extracted feature columns overlap ValueError: if the process function doesn't return a :class:`pd.Series` with index conform to audformat_ and elements of shape ``(num_features)`` RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid .. _audformat: https://audeering.github.io/audformat/data-format.html """ if not isinstance(table, pd.Series) and not isinstance(table, pd.DataFrame): raise ValueError("table has to be pd.Series or pd.DataFrame") if isinstance(table, pd.Series): if table.name in self.feature_names: raise ValueError( "Name of table may not overlap with returned feature names." ) else: if any([col in self.feature_names for col in table.columns]): raise ValueError( "Column names in table may not overlap with returned feature names." ) index = audformat.utils.to_segmented_index(table.index) utils.assert_index(index) y = self.process.process_index( index, preserve_index=False, root=root, cache_root=cache_root, process_func_args=process_func_args, ) # Assign labels from the original table # to the newly created segments files = [] starts = [] ends = [] labels = [] features = {col: [] for col in self.feature_names} if isinstance(table, pd.Series): table = table.to_frame() for n, ((file, start, _), series) in enumerate(y.items()): self._check_return_format(series) if series.empty: continue data = self._reshape_numpy(series) data = np.stack(data).T index_data = series.index.to_numpy() files.extend([file] * len(series)) starts.extend([t[0] + start for t in index_data]) ends.extend([t[1] + start for t in index_data]) for col, val in zip(self.feature_names, data): features[col].extend(val) labels.extend([[table.iloc[n].values] * len(series)]) labels = np.vstack(labels) if labels else np.empty((0, table.shape[1])) index = audformat.segmented_index(files, starts, ends) if len(index) == 0: # Pass no data to ensure consistent dtype for columns result = pd.DataFrame( index=audformat.segmented_index(), columns=self.feature_names ) else: result = pd.DataFrame( index=index, data=features, columns=self.feature_names, ) dtypes = [table[col].dtype for col in table.columns] labels = { col: pd.Series( labels[:, ncol], index=index, dtype=dtypes[ncol] ) # supports also category for ncol, col in enumerate(table.columns) } table = pd.DataFrame(labels, index, columns=table.columns) result = result.join(table) return result
def _check_return_format(self, x): if not isinstance(x, pd.Series): raise ValueError(f"A series must be returned, but got {type(x)}") utils.assert_index(x.index) # Disallow filewise index if audformat.is_filewise_index(x.index): raise ValueError( "The returned series must have a signal index or a segmented index" ) def _construct_frame_segmented_index(self, y: pd.Series): r"""Construct dataframe with segmented index from process result.""" files = [] starts = [] ends = [] features = {col: [] for col in self.feature_names} for (file, start, _), series in y.items(): self._check_return_format(series) if series.empty: continue data = self._reshape_numpy(series) data = np.stack(data).T index_data = series.index.to_numpy() files.extend([file] * len(series)) starts.extend([t[0] + start for t in index_data]) ends.extend([t[1] + start for t in index_data]) for col, val in zip(self.feature_names, data): features[col].extend(val) if not files: # Pass no data to ensure consistent dtype for columns return pd.DataFrame( index=audformat.segmented_index(), columns=self.feature_names ) return pd.DataFrame( index=audformat.segmented_index(files, starts, ends), data=features, columns=self.feature_names, ) def _construct_frame_signal_index(self, y: pd.Series): r"""Construct dataframe with signal index from process result.""" starts = [] ends = [] features = {col: [] for col in self.feature_names} for (start, _), series in y.items(): self._check_return_format(series) if series.empty: continue data = self._reshape_numpy(series) data = np.stack(data).T index_data = series.index.to_numpy() starts.extend([t[0] + start for t in index_data]) ends.extend([t[1] + start for t in index_data]) for col, val in zip(self.feature_names, data): features[col].extend(val) if not starts: # Pass no data to ensure consistent dtype for columns return pd.DataFrame(index=utils.signal_index(), columns=self.feature_names) return pd.DataFrame( index=utils.signal_index(starts, ends), data=features, columns=self.feature_names, ) def _reshape_numpy( self, features: pd.Series, ): r"""Reshape values in series to numpy array of shape [n_features].""" # Cover case that process function returned a series # with numpy arrays as elements if pd.api.types.is_object_dtype(features): data = [self._reshape_numpy_1d(values) for values in features] else: # Case that the process function returned a series # with elements that are scalar values. # Use to_numpy() directly to preserve dtype of series data = features.to_numpy() data = np.reshape(data, (len(features), self.num_features)) return data def _reshape_numpy_1d( self, features: np.ndarray, ): r"""Reshape to [n_features].""" features = np.asarray(features) features = np.atleast_1d(features) if not features.shape == (self.num_features,): raise ValueError( f"The returned features must be reshapable to ({self.num_features})" ) return features
[docs] def __call__( self, signal: np.ndarray, sampling_rate: int, ) -> pd.Series: r"""Apply processing to signal. This function processes the signal **without** transforming the output into a :class:`pd.DataFrame`. Instead, it will return the raw processed signal. However, if channel selection, mixdown and/or resampling is enabled, the signal will be first remixed and resampled if the input sampling rate does not fit the expected sampling rate. Args: signal: signal values sampling_rate: sampling rate in Hz Returns: Processed signal Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid ValueError: if the process function doesn't return a :class:`pd.Series` with index conform to audformat_ and elements of shape ``(num_features)`` .. _audformat: https://audeering.github.io/audformat/data-format.html """ y = self.process( signal, sampling_rate, ) self._check_return_format(y) return y.apply(lambda x: self._reshape_numpy_1d(x))