Source code for audinterface.core.utils

import collections
import os
import typing

import numpy as np
import pandas as pd

import audformat
import audiofile
import audmath
import audresample

from audinterface.core.typing import Timestamp
from audinterface.core.typing import Timestamps


def assert_index(obj: pd.Index):
    r"""Check if index is conform to audformat."""
    if isinstance(obj, pd.MultiIndex) and len(obj.levels) == 2:
        if obj.has_duplicates:
            max_display = 10
            duplicates = obj[obj.duplicated()]
            msg_tail = "\n..." if len(duplicates) > max_display else ""
            msg_duplicates = "\n".join(
                [str(duplicate) for duplicate in duplicates[:max_display].tolist()]
            )
            raise ValueError("Found duplicates:\n" f"{msg_duplicates}{msg_tail}")

        if not (
            obj.names[0] == audformat.define.IndexField.START
            and obj.names[1] == audformat.define.IndexField.END
        ):
            expected_names = [
                audformat.define.IndexField.START,
                audformat.define.IndexField.END,
            ]
            raise ValueError(
                "Found two levels with names "
                f"{obj.names}, "
                f"but expected names "
                f"{expected_names}."
            )
        if not pd.api.types.is_timedelta64_dtype(obj.levels[0].dtype):
            raise ValueError(
                "Level 'start' must contain values of type 'timedelta64[ns]'."
            )
        if not pd.api.types.is_timedelta64_dtype(obj.levels[1].dtype):
            raise ValueError(
                "Level 'end' must contain values of type 'timedelta64[ns]'."
            )
    else:
        audformat.assert_index(obj)


def is_scalar(value: typing.Any) -> bool:
    r"""Check if value is scalar."""
    return (value is not None) and (
        isinstance(value, str) or not hasattr(value, "__len__")
    )


def preprocess_signal(
    signal: np.ndarray,
    sampling_rate: int,
    expected_rate: int,
    resample: bool,
    channels: typing.Union[int, typing.Sequence[int]],
    mixdown: bool,
) -> (np.ndarray, int):
    r"""Pre-process signal."""
    signal = np.atleast_2d(signal)

    if channels is not None or mixdown:
        signal = audresample.remix(signal, channels, mixdown)

    if expected_rate is not None and sampling_rate != expected_rate:
        if resample:
            signal = audresample.resample(
                signal,
                sampling_rate,
                expected_rate,
            )
            sampling_rate = expected_rate
        else:
            raise RuntimeError(
                f"Sampling rate of input signal is "
                f"{sampling_rate} "
                f"but the expected sampling rate is "
                f"{expected_rate} Hz. "
                f"Enable resampling to avoid this error."
            )

    return signal, sampling_rate


[docs]def read_audio( file: str, *, start: pd.Timedelta = None, end: pd.Timedelta = None, root: str = None, ) -> typing.Tuple[np.ndarray, int]: """Reads (segment of an) audio file. Args: file: path to audio file start: read from this position end: read until this position root: root folder Returns: * array with signal values in shape ``(channels, samples)`` * sampling rate in Hz Examples: >>> import audb >>> media = audb.load_media( ... "emodb", ... "wav/03a01Fa.wav", ... version="1.3.0", ... verbose=False, ... ) >>> signal, sampling_rate = read_audio(media[0], end=pd.Timedelta(0.01, unit="s")) >>> signal.shape (1, 160) """ # noqa: E501 if root is not None and not os.path.isabs(file): file = os.path.join(root, file) if start is None or pd.isna(start): offset = 0 else: offset = start.total_seconds() if end is None or pd.isna(end): duration = None else: duration = end.total_seconds() - offset signal, sampling_rate = audiofile.read( file, always_2d=True, offset=offset, duration=duration, ) return signal, sampling_rate
def segment_to_indices( signal: np.ndarray, sampling_rate: int, start: pd.Timedelta, end: pd.Timedelta, ) -> typing.Tuple[int, int]: if pd.isna(end): end = pd.to_timedelta(signal.shape[-1] / sampling_rate, unit="s") max_i = signal.shape[-1] start_i = audmath.samples(start.total_seconds(), sampling_rate) start_i = min(start_i, max_i) end_i = audmath.samples(end.total_seconds(), sampling_rate) end_i = min(end_i, max_i) return start_i, end_i def segments_to_indices( signal: np.ndarray, sampling_rate: int, index: pd.MultiIndex, ) -> typing.Tuple[typing.Sequence[int], typing.Sequence[int]]: starts_i = [0] * len(index) ends_i = [0] * len(index) for idx, (start, end) in enumerate(index): start_i, end_i = segment_to_indices(signal, sampling_rate, start, end) starts_i[idx] = start_i ends_i[idx] = end_i return starts_i, ends_i
[docs]def signal_index( starts: Timestamps = None, ends: Timestamps = None, ) -> pd.MultiIndex: r"""Create signal index. Returns a segmented index like :func:`audformat.segmented_index`, but without the ``'file'`` level. Can be used with the following methods: * :meth:`audinterface.Feature.process_signal_from_index` * :meth:`audinterface.Process.process_signal_from_index` * :meth:`audinterface.ProcessWithContext.process_signal_from_index` * :meth:`audinterface.Segment.process_signal_from_index` Args: starts: segment start positions. Time values given as float or integers are treated as seconds ends: segment end positions. Time values given as float or integers are treated as seconds Returns: index with start and end times Raises: ValueError: if ``start`` and ``ends`` differ in size Examples: >>> signal_index(0, 1.1) MultiIndex([('0 days', '0 days 00:00:01.100000')], names=['start', 'end']) >>> signal_index("0ms", "1ms") MultiIndex([('0 days', '0 days 00:00:00.001000')], names=['start', 'end']) >>> signal_index([None, 1], [1, None]) MultiIndex([( NaT, '0 days 00:00:01'), ('0 days 00:00:01', NaT)], names=['start', 'end']) >>> signal_index( ... starts=[0, 1], ... ends=pd.to_timedelta([1000, 2000], unit="ms"), ... ) MultiIndex([('0 days 00:00:00', '0 days 00:00:01'), ('0 days 00:00:01', '0 days 00:00:02')], names=['start', 'end']) >>> signal_index([0, 1]) MultiIndex([('0 days 00:00:00', NaT), ('0 days 00:00:01', NaT)], names=['start', 'end']) >>> signal_index(ends=[1, 2]) MultiIndex([('0 days', '0 days 00:00:01'), ('0 days', '0 days 00:00:02')], names=['start', 'end']) """ starts = to_array(starts) ends = to_array(ends) if starts is None: if ends is not None: starts = [0] * len(ends) else: starts = [] if ends is None: ends = [pd.NaT] * len(starts) if len(starts) != len(ends): raise ValueError( f"Cannot create index," f"'starts' and 'ends' differ in length: " f"{len(starts)} != {len(ends)}.", ) index = pd.MultiIndex.from_arrays( [ pd.TimedeltaIndex(to_timedelta(starts)), pd.TimedeltaIndex(to_timedelta(ends)), ], names=[ audformat.define.IndexField.START, audformat.define.IndexField.END, ], ) assert_index(index) return index
[docs]def sliding_window( signal: np.ndarray, sampling_rate: int, win_dur: Timestamp, hop_dur: Timestamp, ) -> np.ndarray: r"""Reshape signal by applying a sliding window. Windows that do not match the specified duration at the end of the signals will be dropped. Args: signal: input signal in shape ``(samples,)`` or ``(channels, samples)`` sampling_rate: sampling rate in Hz win_dur: window duration, if value is as a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options hop_dur: hop duration, if value is as a float or integer it is treated as seconds. See :func:`audinterface.utils.to_timedelta` for further options Returns: view of signal with shape ``(channels, samples, frames)`` Raises: ValueError: if ``win_dur`` or ``hop_dur`` is smaller than ``1/sampling_rate`` Examples: >>> signal = np.array( ... [ ... [0, 1, 2, 3, 4, 5], ... [0, 10, 20, 30, 40, 50], ... ], ... ) >>> signal array([[ 0, 1, 2, 3, 4, 5], [ 0, 10, 20, 30, 40, 50]]) >>> frames = sliding_window( ... signal, ... sampling_rate=1, ... win_dur=3, ... hop_dur=2, ... ) >>> # First frame >>> frames[..., 0] array([[ 0, 1, 2], [ 0, 10, 20]]) >>> # Last frame >>> frames[..., -1] array([[ 2, 3, 4], [20, 30, 40]]) >>> # Mean per frame >>> frames.mean(axis=1) array([[ 1., 3.], [10., 30.]]) """ signal = np.atleast_2d(signal) win_dur = to_timedelta(win_dur, sampling_rate) hop_dur = to_timedelta(hop_dur, sampling_rate) win_length = int(win_dur.total_seconds() * sampling_rate) hop_length = int(hop_dur.total_seconds() * sampling_rate) if win_length <= 0: raise ValueError( f"When the sampling rate is " f"{sampling_rate} " f"Hz the window duration must be at least " f"{1.0/sampling_rate}s, " f"but got " f"{win_dur.total_seconds()}s." ) if hop_length <= 0: raise ValueError( f"When the sampling rate is " f"{sampling_rate} " f"Hz the hop duration must be at least " f"{1.0/sampling_rate}s, " f"but got " f"{hop_dur.total_seconds()}s." ) if signal.shape[1] < win_length: # signal too short return np.array([], dtype=signal.dtype) shape = (signal.shape[0], signal.shape[1] - win_length + 1, win_length) strides = (signal.strides[0], signal.strides[1], signal.strides[1]) frames = np.lib.stride_tricks.as_strided( signal, strides=strides, shape=shape, )[:, 0::hop_length] frames = frames.swapaxes(1, 2) # make frames last axis return frames
def to_array(value: typing.Any) -> np.ndarray: r"""Convert value to numpy array.""" if value is not None: if isinstance(value, (pd.Series, pd.DataFrame, pd.Index)): value = value.to_numpy() elif is_scalar(value): value = np.array([value]) return value
[docs]def to_timedelta( durations: Timestamps, sampling_rate: int = None, ) -> typing.Union[pd.Timedelta, typing.List[pd.Timedelta]]: r"""Convert duration value(s) to :class:`pandas.Timedelta`. The single duration values support all formats mentioned in :func:`audmath.duration_in_seconds`, like ``'2 ms'``, or ``pandas.to_timedelta(2, 's')``. The exception is that float and integer values are always interpreted as seconds and strings without unit always as samples. Args: durations: duration value(s). If value is a float or integer it is treated as seconds. To specify a unit provide as string, e.g. ``'2ms'``. To specify in samples provide as string without unit, e.g. ``'2000'`` sampling_rate: sampling rate in Hz. Needs to be provided if any duration value is provided in samples Returns: duration value(s) as :class:`pandas.Timedelta` objects Raises: ValueError: if a duration value is given in samples, but ``sampling_rate`` is ``None`` ValueError: if a duration is a string that does not match a valid '<value><unit>' pattern or the provided unit is not supported Examples: >>> to_timedelta(2) Timedelta('0 days 00:00:02') >>> to_timedelta(2.0) Timedelta('0 days 00:00:02') >>> to_timedelta("2ms") Timedelta('0 days 00:00:00.002000') >>> to_timedelta("200milliseconds") Timedelta('0 days 00:00:00.200000') >>> to_timedelta([1, "2000"], 1000) [Timedelta('0 days 00:00:01'), Timedelta('0 days 00:00:02')] """ # noqa: E501 def duration_in_seconds(duration, sampling_rate): """Helper function to convert to seconds.""" if not isinstance(duration, str): # force non-string values to represent seconds sampling_rate = None elif all(d.isdigit() for d in duration): # force string without unit to represent samples if sampling_rate is None: raise ValueError( "You have to provide 'sampling_rate' " "when specifying the duration in samples " f"as you did with '{duration}'. " ) return audmath.duration_in_seconds(duration, sampling_rate) if not isinstance(durations, str) and isinstance( durations, collections.abc.Iterable ): # sequence of duration entries durations = [to_timedelta(duration, sampling_rate) for duration in durations] else: # single duration entry # avoid converting Timedelta values to ensure precision # https://github.com/audeering/audinterface/pull/137 if isinstance(durations, pd.Timedelta): return durations durations = duration_in_seconds(durations, sampling_rate) durations = pd.to_timedelta(durations, unit="s") return durations