Source code for audinterface.core.process

import errno
import inspect
import itertools
import os
import typing

import numpy as np
import pandas as pd

import audeer
import audformat
import audmath

from audinterface.core import utils
from audinterface.core.segment import Segment
from audinterface.core.typing import Timestamp
from audinterface.core.typing import Timestamps


def identity(signal, sampling_rate) -> np.ndarray:
    r"""Default processing function.

    This function is used,
    when ``Process`` is instantiated
    with ``process_func=None``.
    It returns the given signal.

    Args:
        signal: signal
        sampling_rate: sampling rate in Hz

    Returns:
        signal

    """
    return signal


[docs]class Process: r"""Processing interface. Args: process_func: processing function, which expects the two positional arguments ``signal`` and ``sampling_rate`` and any number of additional keyword arguments (see ``process_func_args``). There are the following special arguments: ``'idx'``, ``'file'``, ``'root'``. If expected by the function, but not specified in ``process_func_args``, they will be replaced with: a running index, the currently processed file, the root folder. There is no restriction on the return type of the function process_func_args: (keyword) arguments passed on to the processing function process_func_is_mono: if set to ``True`` and the input signal has multiple channels, ``process_func`` will be applied to every channel individually sampling_rate: sampling rate in Hz. If ``None`` it will call ``process_func`` with the actual sampling rate of the signal resample: if ``True`` enforces given sampling rate by resampling channels: channel selection, see :func:`audresample.remix` mixdown: apply mono mix-down on selection win_dur: window duration, if processing should be applied on a sliding window. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options hop_dur: hop duration, if processing should be applied on a sliding window. This defines the shift between two windows. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options. Defaults to ``win_dur / 2`` min_signal_dur: minimum signal length required by ``process_func``. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options. If provided signal is shorter, it will be zero padded at the end max_signal_dur: maximum signal length required by ``process_func``. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options. If provided signal is longer, it will be cut at the end segment: when a :class:`audinterface.Segment` object is provided, it will be used to find a segmentation of the input signal. Afterwards processing is applied to each segment keep_nat: if the end of segment is set to ``NaT`` do not replace with file duration in the result num_workers: number of parallel jobs or 1 for sequential processing. If ``None`` will be set to the number of processors on the machine multiplied by 5 in case of multithreading and number of processors in case of multiprocessing multiprocessing: use multiprocessing instead of multithreading verbose: show debug messages Raises: ValueError: if ``resample = True``, but ``sampling_rate = None`` ValueError: if ``hop_dur`` is specified, but not ``win_dur`` Examples: >>> def mean(signal, sampling_rate): ... return signal.mean() >>> interface = Process(process_func=mean) >>> signal = np.array([1.0, 2.0, 3.0]) >>> interface(signal, sampling_rate=3) 2.0 >>> interface.process_signal(signal, sampling_rate=3) start end 0 days 0 days 00:00:01 2.0 dtype: float64 >>> # Apply interface on an audformat conform index of a dataframe >>> import audb >>> db = audb.load( ... "emodb", ... version="1.3.0", ... media="wav/03a01Fa.wav", ... full_path=False, ... verbose=False, ... ) >>> index = db["emotion"].index >>> interface.process_index(index, root=db.root) file start end wav/03a01Fa.wav 0 days 0 days 00:00:01.898250 -0.000311 dtype: float32 >>> interface.process_index(index, root=db.root, preserve_index=True) file wav/03a01Fa.wav -0.000311 dtype: float32 >>> # Apply interface with a sliding window >>> interface = Process( ... process_func=mean, ... win_dur=1.0, ... hop_dur=0.5, ... ) >>> interface.process_index(index, root=db.root) file start end wav/03a01Fa.wav 0 days 00:00:00 0 days 00:00:01 -0.000329 0 days 00:00:00.500000 0 days 00:00:01.500000 -0.000285 dtype: float32 """ # noqa: E501 def __init__( self, *, process_func: typing.Callable[..., typing.Any] = None, process_func_args: typing.Dict[str, typing.Any] = None, process_func_is_mono: bool = False, sampling_rate: int = None, resample: bool = False, channels: typing.Union[int, typing.Sequence[int]] = None, mixdown: bool = False, win_dur: Timestamp = None, hop_dur: Timestamp = None, min_signal_dur: Timestamp = None, max_signal_dur: Timestamp = None, segment: Segment = None, keep_nat: bool = False, num_workers: typing.Optional[int] = 1, multiprocessing: bool = False, verbose: bool = False, ): if channels is not None: channels = audeer.to_list(channels) if resample and sampling_rate is None: raise ValueError("sampling_rate has to be provided for resample = True.") if win_dur is None and hop_dur is not None: raise ValueError("You have to specify 'win_dur' if 'hop_dur' is given.") if win_dur is not None and hop_dur is None: hop_dur = utils.to_timedelta(win_dur, sampling_rate) / 2 process_func = process_func or identity signature = inspect.signature(process_func) self._process_func_signature = dict(signature.parameters) r"""Arguments present in processing function.""" self.channels = channels r"""Channel selection.""" self.keep_nat = keep_nat r"""Keep NaT in results.""" self.hop_dur = hop_dur r"""Hop duration.""" self.max_signal_dur = max_signal_dur r"""Maximum signal length.""" self.min_signal_dur = min_signal_dur r"""Minimum signal length.""" self.mixdown = mixdown r"""Mono mixdown.""" self.multiprocessing = multiprocessing r"""Use multiprocessing.""" self.num_workers = num_workers r"""Number of workers.""" self.process_func = process_func r"""Processing function.""" self.process_func_args = process_func_args or {} r"""Additional keyword arguments to processing function.""" self.process_func_is_mono = process_func_is_mono r"""Process channels individually.""" self.resample = resample r"""Resample signal.""" self.sampling_rate = sampling_rate r"""Sampling rate in Hz.""" self.segment = segment r"""Segmentation object.""" self.verbose = verbose r"""Show debug messages.""" self.win_dur = win_dur r"""Window duration.""" def _process_file( self, file: str, *, idx: int = 0, root: str = None, start: pd.Timedelta = None, end: pd.Timedelta = None, process_func_args: typing.Dict[str, typing.Any] = None, ) -> typing.Tuple[ typing.List[typing.Any], typing.List[str], typing.List[pd.Timedelta], typing.List[pd.Timedelta], ]: if start is not None: start = utils.to_timedelta(start, self.sampling_rate) if end is not None: end = utils.to_timedelta(end, self.sampling_rate) signal, sampling_rate = utils.read_audio( file, start=start, end=end, root=root, ) y, files, starts, ends = self._process_signal( signal, sampling_rate, idx=idx, root=root, file=file, process_func_args=process_func_args, ) def precision_offset(duration, sampling_rate): # Ensure we get the same precision # by storing what is lost due to rounding # when reading the file duration_at_sample = utils.to_timedelta( audmath.samples(duration.total_seconds(), sampling_rate) / sampling_rate ) return duration - duration_at_sample if self.win_dur is not None: if start is not None: starts = starts + start ends = ends + start else: if start is not None and not pd.isna(start): starts[0] += start ends[0] += start - precision_offset(start, sampling_rate) if self.keep_nat and (end is None or pd.isna(end)): ends[0] = pd.NaT if end is not None and not pd.isna(end): ends[-1] += precision_offset(end, sampling_rate) return y, files, starts, ends
[docs] def process_file( self, file: str, *, start: Timestamp = None, end: Timestamp = None, root: str = None, process_func_args: typing.Dict[str, typing.Any] = None, ) -> pd.Series: r"""Process the content of an audio file. Args: file: file path start: start processing at this position. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options end: end processing at this position. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options root: root folder to expand relative file path process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.Process.process_func_args` Returns: Series with processed file conform to audformat_ Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid .. _audformat: https://audeering.github.io/audformat/data-format.html """ if self.segment is not None: index = self.segment.process_file( file, start=start, end=end, root=root, ) return self._process_index_wo_segment(index, root) else: y, files, starts, ends = self._process_file( file, root=root, start=start, end=end, process_func_args=process_func_args, ) index = audformat.segmented_index(files, starts, ends) if len(y) == 0: return pd.Series([], index, dtype=object) else: return pd.Series(y, index)
[docs] def process_files( self, files: typing.Sequence[str], *, starts: Timestamps = None, ends: Timestamps = None, root: str = None, process_func_args: typing.Dict[str, typing.Any] = None, ) -> pd.Series: r"""Process a list of files. Args: files: list of file paths starts: segment start positions. Time values given as float or integers are treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options. If a scalar is given, it is applied to all files ends: segment end positions. Time values given as float or integers are treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options. If a scalar is given, it is applied to all files root: root folder to expand relative file paths process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.Process.process_func_args` Returns: Series with processed files conform to audformat_ Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid .. _audformat: https://audeering.github.io/audformat/data-format.html """ if len(files) == 0: return pd.Series(dtype=object) if self.segment is not None: index = self.segment.process_files( files, starts=starts, ends=ends, root=root, ) return self._process_index_wo_segment( index, root, process_func_args=process_func_args, ) if isinstance(starts, (type(None), float, int, str, pd.Timedelta)): starts = [starts] * len(files) if isinstance(ends, (type(None), float, int, str, pd.Timedelta)): ends = [ends] * len(files) params = [ ( (file,), { "idx": idx, "root": root, "start": start, "end": end, "process_func_args": process_func_args, }, ) for idx, (file, start, end) in enumerate(zip(files, starts, ends)) ] verbose = self.verbose self.verbose = False # avoid nested progress bar xs = audeer.run_tasks( self._process_file, params, num_workers=self.num_workers, multiprocessing=self.multiprocessing, progress_bar=verbose, task_description=f"Process {len(files)} files", ) self.verbose = verbose y = list(itertools.chain.from_iterable([x[0] for x in xs])) files = list(itertools.chain.from_iterable([x[1] for x in xs])) starts = list(itertools.chain.from_iterable([x[2] for x in xs])) ends = list(itertools.chain.from_iterable([x[3] for x in xs])) index = audformat.segmented_index(files, starts, ends) y = pd.Series(y, index) return y
[docs] def process_folder( self, root: str, *, filetype: str = "wav", include_root: bool = True, process_func_args: typing.Dict[str, typing.Any] = None, ) -> pd.Series: r"""Process files in a folder. .. note:: At the moment does not scan in sub-folders! Args: root: root folder filetype: file extension include_root: if ``True`` the file paths are absolute in the index of the returned result process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.Process.process_func_args` Returns: Series with processed files conform to audformat_ Raises: FileNotFoundError: if folder does not exist RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid .. _audformat: https://audeering.github.io/audformat/data-format.html """ root = audeer.path(root) if not os.path.exists(root): raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), root, ) files = audeer.list_file_names( root, filetype=filetype, basenames=not include_root, ) return self.process_files( files, root=root, process_func_args=process_func_args, )
def _process_index_wo_segment( self, index: pd.Index, root: typing.Optional[str], process_func_args: typing.Dict[str, typing.Any] = None, ) -> pd.Series: r"""Like process_index, but does not apply segmentation.""" if index.empty: return pd.Series(None, index=index, dtype=object) params = [ ( (file,), { "idx": idx, "root": root, "start": start, "end": end, "process_func_args": process_func_args, }, ) for idx, (file, start, end) in enumerate(index) ] xs = audeer.run_tasks( self._process_file, params, num_workers=self.num_workers, multiprocessing=self.multiprocessing, progress_bar=self.verbose, task_description=f"Process {len(index)} segments", ) y = list(itertools.chain.from_iterable([x[0] for x in xs])) files = list(itertools.chain.from_iterable([x[1] for x in xs])) starts = list(itertools.chain.from_iterable([x[2] for x in xs])) ends = list(itertools.chain.from_iterable([x[3] for x in xs])) index = audformat.segmented_index(files, starts, ends) y = pd.Series(y, index) return y
[docs] def process_index( self, index: pd.Index, *, preserve_index: bool = False, root: str = None, cache_root: str = None, process_func_args: typing.Dict[str, typing.Any] = None, ) -> pd.Series: r"""Process from an index conform to audformat_. If ``cache_root`` is not ``None``, a hash value is created from the index using :func:`audformat.utils.hash` and the result is stored as ``<cache_root>/<hash>.pkl``. When called again with the same index, results will be read from the cached file. Args: index: index with segment information preserve_index: if ``True`` and :attr:`audinterface.Process.segment` is ``None`` the returned index will be of same type as the original one, otherwise always a segmented index is returned root: root folder to expand relative file paths cache_root: cache folder (see description) process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.Process.process_func_args` Returns: Series with processed segments conform to audformat_ Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid .. _audformat: https://audeering.github.io/audformat/data-format.html """ cache_path = None if cache_root is not None: cache_root = audeer.mkdir(cache_root) hash = audformat.utils.hash(index) cache_path = os.path.join(cache_root, f"{hash}.pkl") if cache_path and os.path.exists(cache_path): y = pd.read_pickle(cache_path) else: segmented_index = audformat.utils.to_segmented_index(index) if self.segment is not None: segmented_index = self.segment.process_index( segmented_index, root=root, ) y = self._process_index_wo_segment( segmented_index, root, process_func_args=process_func_args, ) if cache_path is not None: y.to_pickle(cache_path, protocol=4) if self.segment is None and preserve_index: # Convert segmented index to filewise index # if original index was filewise y.index = index return y
def _process_signal( self, signal: np.ndarray, sampling_rate: int, *, idx: int = 0, root: str = None, file: str = None, start: pd.Timedelta = None, end: pd.Timedelta = None, process_func_args: typing.Dict[str, typing.Any] = None, ) -> typing.Tuple[ typing.List[typing.Any], typing.List[str], typing.List[pd.Timedelta], typing.List[pd.Timedelta], ]: signal = np.atleast_2d(signal) # Find start and end index if start is None or pd.isna(start): start = pd.to_timedelta(0) if end is None or (pd.isna(end) and not self.keep_nat): end = pd.to_timedelta(signal.shape[-1] / sampling_rate, unit="s") start_i, end_i = utils.segment_to_indices( signal, sampling_rate, start, end, ) # Trim signal and ensure it has requested min/max length signal = signal[:, start_i:end_i] num_samples = signal.shape[1] if self.max_signal_dur is not None: max_signal_dur = utils.to_timedelta( self.max_signal_dur, sampling_rate, ) max_samples = int(max_signal_dur.total_seconds() * sampling_rate) if num_samples > max_samples: end = start + max_signal_dur signal = signal[:, :max_samples] if self.min_signal_dur is not None: min_signal_dur = utils.to_timedelta( self.min_signal_dur, sampling_rate, ) min_samples = int(min_signal_dur.total_seconds() * sampling_rate) if num_samples < min_samples: end = start + min_signal_dur num_pad = min_samples - num_samples signal = np.pad(signal, ((0, 0), (0, num_pad)), "constant") # Process signal y = self._call( signal, sampling_rate, idx=idx, root=root, file=file, process_func_args=process_func_args, ) # Create index if self.win_dur is not None: win_dur = utils.to_timedelta(self.win_dur, sampling_rate) hop_dur = utils.to_timedelta(self.hop_dur, sampling_rate) starts = pd.timedelta_range( start, freq=hop_dur, periods=len(y), ) ends = starts + win_dur else: starts = [start] ends = [end] y = [y] return y, [file] * len(starts), starts, ends
[docs] def process_signal( self, signal: np.ndarray, sampling_rate: int, *, file: str = None, start: Timestamp = None, end: Timestamp = None, process_func_args: typing.Dict[str, typing.Any] = None, ) -> pd.Series: r"""Process audio signal and return result. .. note:: If a ``file`` is given, the index of the returned frame has levels ``file``, ``start`` and ``end``. Otherwise, it consists only of ``start`` and ``end``. Args: signal: signal values sampling_rate: sampling rate in Hz file: file path start: start processing at this position. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options end: end processing at this position. If value is a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.Process.process_func_args` Returns: Series with processed signal conform to audformat_ Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid .. _audformat: https://audeering.github.io/audformat/data-format.html """ if self.segment is not None: index = self.segment.process_signal( signal, sampling_rate, file=file, start=start, end=end, ) return self._process_signal_from_index_wo_segment( signal, sampling_rate, index, process_func_args=process_func_args, ) else: if start is not None: start = utils.to_timedelta(start, sampling_rate) if end is not None: end = utils.to_timedelta(end, sampling_rate) y, files, starts, ends = self._process_signal( signal, sampling_rate, file=file, start=start, end=end, process_func_args=process_func_args, ) if file is not None: index = audformat.segmented_index(files, starts, ends) else: index = utils.signal_index(starts, ends) if len(y) == 0: return pd.Series([], index, dtype=object) else: return pd.Series(y, index)
def _process_signal_from_index_wo_segment( self, signal: np.ndarray, sampling_rate: int, index: pd.Index, process_func_args: typing.Dict[str, typing.Any] = None, ) -> pd.Series: r"""Like process_signal_from_index, but does not apply segmentation.""" if index.empty: return pd.Series(None, index=index, dtype=object) skip_file_level = isinstance(index, pd.MultiIndex) and len(index.levels) == 2 if skip_file_level: params = [ ( (signal, sampling_rate), { "idx": idx, "start": start, "end": end, "process_func_args": process_func_args, }, ) for idx, (start, end) in enumerate(index) ] else: index = audformat.utils.to_segmented_index(index) params = [ ( (signal, sampling_rate), { "idx": idx, "file": file, "start": start, "end": end, "process_func_args": process_func_args, }, ) for idx, (file, start, end) in enumerate(index) ] xs = audeer.run_tasks( self._process_signal, params, num_workers=self.num_workers, multiprocessing=self.multiprocessing, progress_bar=self.verbose, task_description=f"Process {len(index)} segments", ) y = list(itertools.chain.from_iterable([x[0] for x in xs])) starts = list(itertools.chain.from_iterable([x[2] for x in xs])) ends = list(itertools.chain.from_iterable([x[3] for x in xs])) if skip_file_level: index = utils.signal_index(starts, ends) else: files = list(itertools.chain.from_iterable([x[1] for x in xs])) index = audformat.segmented_index(files, starts, ends) y = pd.Series(y, index) return y
[docs] def process_signal_from_index( self, signal: np.ndarray, sampling_rate: int, index: pd.Index, process_func_args: typing.Dict[str, typing.Any] = None, ) -> pd.Series: r"""Split a signal into segments and process each segment. Args: signal: signal values sampling_rate: sampling rate in Hz index: a segmented index conform to audformat_ or a :class:`pandas.MultiIndex` with two levels named `start` and `end` that hold start and end positions as :class:`pandas.Timedelta` objects. See also :func:`audinterface.utils.signal_index` process_func_args: (keyword) arguments passed on to the processing function. They will temporarily overwrite the ones stored in :attr:`audinterface.Process.process_func_args` Returns: Series with processed segments conform to audformat_ Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid ValueError: if index contains duplicates .. _audformat: https://audeering.github.io/audformat/data-format.html """ utils.assert_index(index) if index.empty: return pd.Series(None, index=index, dtype=object) if self.segment is not None: index = self.segment.process_signal_from_index( signal, sampling_rate, index, ) return self._process_signal_from_index_wo_segment( signal, sampling_rate, index, process_func_args=process_func_args, )
def _call( self, signal: np.ndarray, sampling_rate: int, *, idx: int = 0, root: str = None, file: str = None, process_func_args: typing.Dict[str, typing.Any] = None, ) -> typing.Any: r"""Call processing function, possibly pass special args.""" signal, sampling_rate = utils.preprocess_signal( signal, sampling_rate, self.sampling_rate, self.resample, self.channels, self.mixdown, ) process_func_args = process_func_args or self.process_func_args special_args = {} for key, value in [ ("idx", idx), ("root", root), ("file", file), ]: if key in self._process_func_signature and key not in process_func_args: special_args[key] = value def _helper(x): if self.process_func_is_mono: return [ self.process_func( np.atleast_2d(channel), sampling_rate, **special_args, **process_func_args, ) for channel in x ] else: return self.process_func( x, sampling_rate, **special_args, **process_func_args, ) if self.win_dur is not None: frames = utils.sliding_window( signal, sampling_rate, self.win_dur, self.hop_dur, ) num_frames = frames.shape[-1] y = [_helper(frames[..., idx]) for idx in range(num_frames)] else: y = _helper(signal) return y
[docs] def __call__( self, signal: np.ndarray, sampling_rate: int, ) -> typing.Any: r"""Apply processing to signal. This function processes the signal **without** transforming the output into a :class:`pd.Series`. Instead, it will return the raw processed signal. However, if channel selection, mixdown and/or resampling is enabled, the signal will be first remixed and resampled if the input sampling rate does not fit the expected sampling rate. Args: signal: signal values sampling_rate: sampling rate in Hz Returns: Processed signal Raises: RuntimeError: if sampling rates do not match RuntimeError: if channel selection is invalid """ return self._call(signal, sampling_rate)