import errno
import inspect
import os
import typing
import numpy as np
import pandas as pd
import audeer
import audformat
from audinterface.core.process import Process
from audinterface.core.segment import Segment
from audinterface.core.typing import Timestamp
from audinterface.core.typing import Timestamps
import audinterface.core.utils as utils
def zeros(signal, sampling_rate, num_channels, num_features) -> np.ndarray:
r"""Default feature process function.
This function is used,
when ``Feature`` is instantiated
with ``process_func=None``.
It returns zeros for all channels and features.
Args:
signal: signal
sampling_rate: sampling rate in Hz
num_channels: number of audio channels
num_features: number of features
Returns:
zeros with size ``(num_channels, num_features)``
"""
return np.zeros(
(num_channels, num_features),
dtype=object,
)
[docs]class Feature:
r"""Feature extraction interface.
The features are returned as a :class:`pandas.DataFrame`.
If your input signal is of size ``(num_channels, num_time_steps)``,
the returned object has ``num_channels * num_features`` columns.
It will have one row per file or signal.
If features are extracted using a sliding window,
each window will be stored as one row.
If ``win_dur`` is specified ``start`` and ``end`` indices
are referred from the original ``start`` and ``end`` arguments
and the window positions.
If ``win_dur`` is ``None``,
the original ``start`` and ``end`` indices are kept.
If
``process_func_applies_sliding_window``
is set to ``True``
the processing function
is responsible to apply the sliding window.
Otherwise,
the sliding window is applied before
the processing function is called.
If the arguments
``win_dur`` and ``hop_dur``
are not specified in
``process_func_args``,
but
``process_func``
expects them,
they are passed on automatically.
Args:
feature_names: features are stored as columns in a data frame,
where ``feature_names`` defines the names of the columns.
If ``len(channels)`` > 1,
the data frame has a multi-column index with
with channel ID as first level
and ``feature_names`` as second level
name: name of the feature set, e.g. ``'stft'``
params: parameters that describe the feature set,
e.g. ``{'win_size': 512, 'hop_size': 256, 'num_fft': 512}``.
With the parameters you can differentiate different flavors of
the same feature set
process_func: feature extraction function,
which expects the two positional arguments ``signal``
and ``sampling_rate``
and any number of additional keyword arguments
(see ``process_func_args``).
There are the following special arguments:
``'idx'``, ``'file'``, ``'root'``.
If expected by the function,
but not specified in
``process_func_args``,
they will be replaced with:
a running index,
the currently processed file,
the root folder.
The function must return features in the shape of
``(num_features)``,
``(num_channels, num_features)``,
``(num_features, num_frames)``,
or ``(num_channels, num_features, num_frames)``
process_func_args: (keyword) arguments passed on to the processing
function
process_func_is_mono: apply ``process_func`` to every channel
individually
process_func_applies_sliding_window:
if ``True``
the processing function receives
whole files or segments and is responsible
for applying a sliding window itself.
If ``False``,
the sliding window is applied internally
and the processing function
receives individual frames instead.
Applies only if
features are extracted in a framewise manner
(see ``win_dur`` and ``hop_dur``)
sampling_rate: sampling rate in Hz.
If ``None`` it will call ``process_func`` with the actual
sampling rate of the signal
resample: if ``True`` enforces given sampling rate by resampling
channels: channel selection, see :func:`audresample.remix`
win_dur: window duration,
if features are extracted with a sliding window.
If value is a float or integer
it is treated as seconds.
See :func:`audinterface.utils.to_timedelta` for further options
hop_dur: hop duration,
if features are extracted with a sliding window.
This defines the shift between two windows.
If value is a float or integer
it is treated as seconds.
See :func:`audinterface.utils.to_timedelta` for further options.
Defaults to ``win_dur / 2``
min_signal_dur: minimum signal duration
required by ``process_func``.
If value is a float or integer
it is treated as seconds.
See :func:`audinterface.utils.to_timedelta` for further options.
If provided signal is shorter,
it will be zero padded at the end
max_signal_dur: maximum signal duraton
required by ``process_func``.
If value is a float or integer
it is treated as seconds.
See :func:`audinterface.utils.to_timedelta` for further options.
If provided signal is longer,
it will be cut at the end
mixdown: apply mono mix-down on selection
segment: when a :class:`audinterface.Segment` object is provided,
it will be used to find a segmentation of the input signal.
Afterwards processing is applied to each segment
keep_nat: if the end of segment is set to ``NaT`` do not replace
with file duration in the result
num_workers: number of parallel jobs or 1 for sequential
processing. If ``None`` will be set to the number of
processors on the machine multiplied by 5 in case of
multithreading and number of processors in case of
multiprocessing
multiprocessing: use multiprocessing instead of multithreading
verbose: show debug messages
Raises:
ValueError: if ``win_dur`` or ``hop_dur`` are given in samples
and ``sampling_rate is None``
ValueError: if ``hop_dur`` is specified, but not ``win_dur``
Examples:
>>> def mean_std(signal, sampling_rate):
... return [signal.mean(), signal.std()]
>>> interface = Feature(["mean", "std"], process_func=mean_std)
>>> signal = np.array([1.0, 2.0, 3.0])
>>> interface(signal, sampling_rate=3)
array([[[2. ],
[0.81649658]]])
>>> interface.process_signal(signal, sampling_rate=3)
mean std
start end
0 days 0 days 00:00:01 2.0 0.816497
>>> # Apply interface on an audformat conform index of a dataframe
>>> import audb
>>> db = audb.load(
... "emodb",
... version="1.3.0",
... media="wav/03a01Fa.wav",
... full_path=False,
... verbose=False,
... )
>>> index = db["emotion"].index
>>> interface.process_index(index, root=db.root)
mean std
file start end
wav/03a01Fa.wav 0 days 0 days 00:00:01.898250 -0.000311 0.082317
>>> interface.process_index(index, root=db.root, preserve_index=True)
mean std
file
wav/03a01Fa.wav -0.000311 0.082317
>>> # Apply interface with a sliding window
>>> interface = Feature(
... ["mean", "std"],
... process_func=mean_std,
... win_dur=1.0,
... hop_dur=0.25,
... )
>>> interface.process_index(index, root=db.root)
mean std
file start end
wav/03a01Fa.wav 0 days 00:00:00 0 days 00:00:01 -0.000329 0.098115
0 days 00:00:00.250000 0 days 00:00:01.250000 -0.000405 0.087917
0 days 00:00:00.500000 0 days 00:00:01.500000 -0.000285 0.067042
0 days 00:00:00.750000 0 days 00:00:01.750000 -0.000187 0.063677
>>> # Apply the same process function on all channels
>>> # of a multi-channel signal
>>> import audiofile
>>> signal, sampling_rate = audiofile.read(
... audeer.path(db.root, db.files[0]),
... always_2d=True,
... )
>>> signal_multi_channel = np.concatenate(
... [
... signal - 0.5,
... signal + 0.5,
... ],
... )
>>> interface = Feature(
... ["mean", "std"],
... process_func=mean_std,
... process_func_is_mono=True,
... channels=[0, 1],
... )
>>> interface.process_signal(
... signal_multi_channel,
... sampling_rate,
... )
0 1
mean std mean std
start end
0 days 0 days 00:00:01.898250 -0.500311 0.082317 0.499689 0.082317
""" # noqa: E501
def __init__(
self,
feature_names: typing.Union[str, typing.Sequence[str]],
*,
name: str = None,
params: typing.Dict = None,
process_func: typing.Callable[..., typing.Any] = None,
process_func_args: typing.Dict[str, typing.Any] = None,
process_func_is_mono: bool = False,
process_func_applies_sliding_window: bool = False,
sampling_rate: int = None,
resample: bool = False,
channels: typing.Union[int, typing.Sequence[int]] = 0,
mixdown: bool = False,
win_dur: Timestamp = None,
hop_dur: Timestamp = None,
min_signal_dur: Timestamp = None,
max_signal_dur: Timestamp = None,
segment: Segment = None,
keep_nat: bool = False,
num_workers: typing.Optional[int] = 1,
multiprocessing: bool = False,
verbose: bool = False,
):
if mixdown or isinstance(channels, int):
num_channels = 1
else:
num_channels = len(channels)
feature_names = audeer.to_list(feature_names)
if num_channels > 1:
column_names = []
for channel in channels:
column_names.extend(
[(channel, feature_name) for feature_name in feature_names]
)
column_names = pd.MultiIndex.from_tuples(column_names)
else:
column_names = pd.Index(feature_names)
process_func_args = process_func_args or {}
if process_func is None:
process_func_args["num_channels"] = num_channels
process_func_args["num_features"] = len(feature_names)
process_func = zeros
if win_dur is None and hop_dur is not None:
raise ValueError("You have to specify 'win_dur' if 'hop_dur' is given.")
if win_dur is not None and hop_dur is None:
hop_dur = utils.to_timedelta(win_dur, sampling_rate) / 2
# add 'win_dur' and 'hop_dur' to process_func_args
# if expected by function but not yet set
signature = inspect.signature(process_func)
if "win_dur" in signature.parameters and "win_dur" not in process_func_args:
process_func_args["win_dur"] = win_dur
if "hop_dur" in signature.parameters and "hop_dur" not in process_func_args:
process_func_args["hop_dur"] = hop_dur
process = Process(
process_func=process_func,
process_func_args=process_func_args,
process_func_is_mono=process_func_is_mono,
sampling_rate=sampling_rate,
resample=resample,
channels=channels,
mixdown=mixdown,
win_dur=None if process_func_applies_sliding_window else win_dur,
hop_dur=None if process_func_applies_sliding_window else hop_dur,
min_signal_dur=min_signal_dur,
max_signal_dur=max_signal_dur,
segment=segment,
keep_nat=keep_nat,
num_workers=num_workers,
multiprocessing=multiprocessing,
verbose=verbose,
)
self.column_names = column_names
r"""Feature column names."""
self.feature_names = feature_names
r"""Feature names."""
self.hop_dur = hop_dur
r"""Hop duration."""
self.name = name
r"""Name of the feature set."""
self.num_channels = num_channels
r"""Expected number of channels"""
self.num_features = len(feature_names)
r"""Number of features."""
self.params = params
r"""Dictionary of parameters describing the feature set."""
self.process = process
r"""Processing object."""
self.process_func_applies_sliding_window = process_func_applies_sliding_window
r"""Controls if processing function applies sliding window."""
self.verbose = verbose
r"""Show debug messages."""
self.win_dur = win_dur
r"""Window duration."""
[docs] def process_file(
self,
file: str,
*,
start: Timestamp = None,
end: Timestamp = None,
root: str = None,
process_func_args: typing.Dict[str, typing.Any] = None,
) -> pd.DataFrame:
r"""Extract features from an audio file.
Args:
file: file path
start: start processing at this position.
If value is a float or integer it is treated as seconds.
See :func:`audinterface.utils.to_timedelta` for further options
end: end processing at this position.
If value is a float or integer it is treated as seconds.
See :func:`audinterface.utils.to_timedelta` for further options
root: root folder to expand relative file path
process_func_args: (keyword) arguments passed on
to the processing function.
They will temporarily overwrite
the ones stored in
:attr:`audinterface.Feature.process.process_func_args`
Raises:
RuntimeError: if sampling rates do not match
RuntimeError: if channel selection is invalid
RuntimeError: if multiple frames are returned,
but ``win_dur`` is not set
"""
series = self.process.process_file(
file,
start=start,
end=end,
root=root,
process_func_args=process_func_args,
)
return self._series_to_frame(series)
[docs] def process_files(
self,
files: typing.Sequence[str],
*,
starts: Timestamps = None,
ends: Timestamps = None,
root: str = None,
process_func_args: typing.Dict[str, typing.Any] = None,
) -> pd.DataFrame:
r"""Extract features for a list of files.
Args:
files: list of file paths
starts: segment start positions.
Time values given as float or integers are treated as seconds.
See :func:`audinterface.utils.to_timedelta`
for further options.
If a scalar is given, it is applied to all files
ends: segment end positions.
Time values given as float or integers are treated as seconds.
See :func:`audinterface.utils.to_timedelta`
for further options.
If a scalar is given, it is applied to all files
root: root folder to expand relative file paths
process_func_args: (keyword) arguments passed on
to the processing function.
They will temporarily overwrite
the ones stored in
:attr:`audinterface.Feature.process.process_func_args`
Raises:
RuntimeError: if sampling rates do not match
RuntimeError: if channel selection is invalid
RuntimeError: if multiple frames are returned,
but ``win_dur`` is not set
"""
series = self.process.process_files(
files,
starts=starts,
ends=ends,
root=root,
process_func_args=process_func_args,
)
return self._series_to_frame(series)
[docs] def process_folder(
self,
root: str,
*,
filetype: str = "wav",
include_root: bool = True,
process_func_args: typing.Dict[str, typing.Any] = None,
) -> pd.DataFrame:
r"""Extract features from files in a folder.
.. note:: At the moment does not scan in sub-folders!
Args:
root: root folder
filetype: file extension
include_root: if ``True``
the file paths are absolute
in the index
of the returned result
process_func_args: (keyword) arguments passed on
to the processing function.
They will temporarily overwrite
the ones stored in
:attr:`audinterface.Feature.process.process_func_args`
Raises:
FileNotFoundError: if folder does not exist
RuntimeError: if sampling rates do not match
RuntimeError: if channel selection is invalid
RuntimeError: if multiple frames are returned,
but ``win_dur`` is not set
"""
root = audeer.path(root)
if not os.path.exists(root):
raise FileNotFoundError(
errno.ENOENT,
os.strerror(errno.ENOENT),
root,
)
files = audeer.list_file_names(
root,
filetype=filetype,
basenames=not include_root,
)
return self.process_files(
files,
root=root,
process_func_args=process_func_args,
)
[docs] def process_index(
self,
index: pd.Index,
*,
preserve_index: bool = False,
root: str = None,
cache_root: str = None,
process_func_args: typing.Dict[str, typing.Any] = None,
) -> pd.DataFrame:
r"""Extract features from an index conform to audformat_.
If ``cache_root`` is not ``None``,
a hash value is created from the index
using :func:`audformat.utils.hash` and
the result is stored as
``<cache_root>/<hash>.pkl``.
When called again with the same index,
features will be read from the cached file.
.. _audformat: https://audeering.github.io/audformat/data-format.html
Args:
index: index with segment information
preserve_index: if ``True``
and :attr:`audinterface.Feature.process.segment` is ``None``
the returned index
will be of same type
as the original one,
otherwise always a segmented index is returned
root: root folder to expand relative file paths
cache_root: cache folder (see description)
process_func_args: (keyword) arguments passed on
to the processing function.
They will temporarily overwrite
the ones stored in
:attr:`audinterface.Feature.process.process_func_args`
Raises:
RuntimeError: if sampling rates do not match
RuntimeError: if channel selection is invalid
RuntimeError: if multiple frames are returned,
but ``win_dur`` is not set
ValueError: if index is not conform to audformat_
"""
cache_path = None
if cache_root is not None:
cache_root = audeer.mkdir(cache_root)
hash = audformat.utils.hash(index)
cache_path = os.path.join(cache_root, f"{hash}.pkl")
if cache_path and os.path.exists(cache_path):
df = pd.read_pickle(cache_path)
else:
y = self.process.process_index(
index,
root=root,
process_func_args=process_func_args,
)
df = self._series_to_frame(y)
if cache_path is not None:
df.to_pickle(cache_path, protocol=4)
if self.process.segment is None and preserve_index:
# Convert segmented index to filewise index
# if original index was filewise
df.index = index
return df
[docs] def process_signal(
self,
signal: np.ndarray,
sampling_rate: int,
*,
file: str = None,
start: Timestamp = None,
end: Timestamp = None,
process_func_args: typing.Dict[str, typing.Any] = None,
) -> pd.DataFrame:
r"""Extract features for an audio signal.
.. note:: If a ``file`` is given, the index of the returned frame
has levels ``file``, ``start`` and ``end``. Otherwise,
it consists only of ``start`` and ``end``.
Args:
signal: signal values
sampling_rate: sampling rate in Hz
file: file path
start: start processing at this position.
If value is a float or integer it is treated as seconds.
See :func:`audinterface.utils.to_timedelta` for further options
end: end processing at this position.
If value is a float or integer it is treated as seconds.
See :func:`audinterface.utils.to_timedelta` for further options
process_func_args: (keyword) arguments passed on
to the processing function.
They will temporarily overwrite
the ones stored in
:attr:`audinterface.Feature.process.process_func_args`
Raises:
RuntimeError: if sampling rates do not match
RuntimeError: if channel selection is invalid
RuntimeError: if dimension of extracted features
is greater than three
RuntimeError: if feature extractor uses sliding window,
but ``self.win_dur`` is not specified
RuntimeError: if number of features does not match
number of feature names
RuntimeError: if multiple frames are returned,
but ``win_dur`` is not set
"""
series = self.process.process_signal(
signal,
sampling_rate,
file=file,
start=start,
end=end,
process_func_args=process_func_args,
)
return self._series_to_frame(series)
[docs] def process_signal_from_index(
self,
signal: np.ndarray,
sampling_rate: int,
index: pd.MultiIndex,
process_func_args: typing.Dict[str, typing.Any] = None,
) -> pd.DataFrame:
r"""Split a signal into segments and extract features for each segment.
Args:
signal: signal values
sampling_rate: sampling rate in Hz
index: a :class:`pandas.MultiIndex` with two levels
named `start` and `end` that hold start and end
positions as :class:`pandas.Timedelta` objects.
See also :func:`audinterface.utils.signal_index`
process_func_args: (keyword) arguments passed on
to the processing function.
They will temporarily overwrite
the ones stored in
:attr:`audinterface.Feature.process.process_func_args`
Raises:
RuntimeError: if sampling rates do not match
RuntimeError: if channel selection is invalid
RuntimeError: if multiple frames are returned,
but ``win_dur`` is not set
ValueError: if index contains duplicates
"""
series = self.process.process_signal_from_index(
signal,
sampling_rate,
index,
process_func_args=process_func_args,
)
return self._series_to_frame(series)
[docs] def to_numpy(
self,
frame: pd.DataFrame,
) -> np.ndarray:
r"""Return feature values as a numpy array.
The returned :class:`numpy.ndarray`
has the original shape,
i.e. ``(channels, features, time)``.
Args:
frame: feature frame
"""
return frame.values.T.reshape(self.num_channels, self.num_features, -1)
def _reshape_3d(self, features: typing.Union[np.ndarray, pd.Series]):
r"""Reshape to [n_channels, n_features, n_frames]."""
features = np.array(features)
features = np.atleast_1d(features)
if self.process.process_func_is_mono:
# when mono processing is turned on
# the channel dimension has to be 1,
# so we would usually omit it,
# but since older versions required
# a channel dimension we have to
# consider two special cases
if (features.ndim == 4) and (features.shape[1] == 1):
# (channels, 1, features, frames)
# -> (channels, features, frames)
features = features.squeeze(axis=1)
elif (
(features.ndim == 3)
and (self.win_dur is None)
and (features.shape[1] == 1)
):
# (channels, 1, features)
# -> (channels, features)
features = features.squeeze(axis=1)
if self.win_dur and not self.process_func_applies_sliding_window:
# (channels, features)
# -> (channels, features, 1)
features = features.reshape(self.num_channels, -1, 1)
if features.ndim > 3:
raise RuntimeError(
f"Dimension of extracted features must be 1, 2 or 3, "
f"not {features.ndim}."
)
# figure out channels, feature, frames
if features.ndim == 1:
n_channels = 1
n_features = features.size
n_frames = 1
elif features.ndim == 2:
if (features.shape[0] == self.num_channels) and (
features.shape[1] == self.num_features
):
n_channels = features.shape[0]
n_features = features.shape[1]
n_frames = 1
elif features.shape[0] == self.num_features:
n_channels = 1
n_features = features.shape[0]
n_frames = features.shape[1]
else:
raise RuntimeError(
f"Cannot determine feature shape from " f"{features.shape}, ",
f"when expected shape is "
f"({self.num_channels, self.num_features, -1}).",
)
else:
n_channels = features.shape[0]
n_features = features.shape[1]
n_frames = features.shape[2]
# assert channels and features have expected length
if n_channels != self.num_channels:
raise RuntimeError(
f"Number of channels must be"
f" {self.num_channels}, "
f"not "
f"{n_channels}."
)
if n_features != self.num_features:
raise RuntimeError(
f"Number of features must be "
f"{self.num_features}, "
f"not "
f"{n_features}."
)
# reshape features to (channels, features, frames)
return features.reshape([n_channels, n_features, n_frames])
def _series_to_frame(
self,
y: pd.Series,
) -> pd.DataFrame:
if y.empty:
if self.process.segment is None:
index = []
else:
index = audformat.segmented_index()
return pd.DataFrame(
index=index,
columns=self.column_names,
dtype=object,
)
if self.win_dur is not None and self.process_func_applies_sliding_window:
win_dur = utils.to_timedelta(
self.win_dur,
self.process.sampling_rate,
)
hop_dur = utils.to_timedelta(
self.hop_dur,
self.process.sampling_rate,
)
starts = []
ends = []
data = []
if len(y.index.levels) == 3:
files = []
for idx, ((file, start, end), values) in enumerate(y.items()):
frames = self._values_to_frame(values)
data.append(frames)
times = pd.timedelta_range(
start,
freq=hop_dur,
periods=frames.shape[0],
)
starts.extend(times.to_list())
ends.extend((times + win_dur).to_list())
files.extend([file] * len(times))
index = audformat.segmented_index(files, starts, ends)
else:
for idx, ((start, end), values) in enumerate(y.items()):
frames = self._values_to_frame(values)
data.append(frames)
times = pd.timedelta_range(
start,
freq=hop_dur,
periods=frames.shape[0],
)
starts.extend(times.to_list())
ends.extend((times + win_dur).to_list())
index = utils.signal_index(starts, ends)
else:
index = y.index
data = [self._values_to_frame(values) for values in y]
data = np.concatenate(data)
df = pd.DataFrame(
data,
index=index,
columns=self.column_names,
)
return df
def _values_to_frame(
self,
features: np.ndarray,
) -> np.ndarray:
# Convert features to a pd.DataFrame
# Assumed formats are:
# [n_channels, n_features, n_frames]
# [n_channels, n_features]
# [n_features, n_frames]
# [n_features]
features = self._reshape_3d(features)
n_channels, n_features, n_frames = features.shape
# Combine features and channels into one dimension
# f1-c1, f2-c1, ..., fN-c1, ..., f1-cM, f2-cM, ..., fN-cM
new_shape = (n_channels * n_features, n_frames)
features = features.reshape(new_shape).T
if n_frames > 1 and self.win_dur is None:
raise RuntimeError(
f"Got " f"{n_frames} " f"frames, but 'win_dur' is not set."
)
return features
[docs] def __call__(
self,
signal: np.ndarray,
sampling_rate: int,
) -> np.ndarray:
r"""Apply processing to signal.
This function processes the signal
**without** transforming the output
into a :class:`pandas.DataFrame`.
Instead, it will return the raw processed signal.
However,
if channel selection,
mixdown and/or resampling is enabled,
the signal will be first remixed
and resampled if the input sampling rate
does not fit the expected sampling rate.
Args:
signal: signal values
sampling_rate: sampling rate in Hz
Returns:
feature array with shape
``(num_channels, num_features, num_frames)``
Raises:
RuntimeError: if sampling rates do not match
RuntimeError: if channel selection is invalid
RuntimeError: if multiple frames are returned,
but ``win_dur`` is not set
"""
y = self.process(
signal,
sampling_rate,
)
return self._reshape_3d(y)