Source code for audformat.core.index

import typing

import numpy as np
import pandas as pd

from audformat.core import define
from audformat.core import utils
from audformat.core.typing import Files
from audformat.core.typing import Timestamps


def is_scalar(value: typing.Any) -> bool:
    r"""Check if value is scalar."""
    return (value is not None) and (
        isinstance(value, str) or not hasattr(value, "__len__")
    )


def to_array(value: typing.Any) -> typing.Union[list, np.ndarray]:
    r"""Convert value to list or array."""
    if value is not None:
        if isinstance(value, (pd.Series, pd.DataFrame, pd.Index)):
            value = value.to_numpy()
        elif is_scalar(value):
            value = [value]
    return value


def to_timedelta(times):
    r"""Convert time value to pd.Timedelta."""
    try:
        return pd.to_timedelta(times, unit="s")
    except ValueError:  # catches values like '1s'
        return pd.to_timedelta(times)


[docs]def assert_index( obj: typing.Union[pd.Index, pd.Series, pd.DataFrame], ): r"""Assert object is conform to :ref:`table specifications <data-tables:Tables>`. This does not check for duplicates in the index. If you need that check use :func:`audformat.assert_no_duplicates` in addition. Args: obj: object Raises: ValueError: if not conform to :ref:`table specifications <data-tables:Tables>` """ # noqa: D205 if isinstance(obj, (pd.Series, pd.DataFrame)): obj = obj.index num = len(obj.names) if num != 1 and num != 3: raise ValueError( "Index not conform to audformat. " f"Found " f"{num} " f"levels, but expected 1 or 3 levels." ) if num == 1: if obj.names[0] != define.IndexField.FILE: raise ValueError( "Index not conform to audformat. " "Found single level with name " f"{obj.names[0]}, " f"but expected name " f"'{define.IndexField.FILE}'." ) if not pd.api.types.is_string_dtype(obj.dtype): raise ValueError( "Index not conform to audformat. " "Level 'file' must contain values of type 'string'." ) elif num == 3: if not ( obj.names[0] == define.IndexField.FILE and obj.names[1] == define.IndexField.START and obj.names[2] == define.IndexField.END ): expected_names = [ define.IndexField.FILE, define.IndexField.START, define.IndexField.END, ] raise ValueError( "Index not conform to audformat. " "Found three levels with names " f"{obj.names}, " f"but expected names " f"{expected_names}." ) if not pd.api.types.is_string_dtype(obj.levels[0].dtype): raise ValueError( "Index not conform to audformat. " "Level 'file' must contain values of type 'string'." ) if not pd.api.types.is_timedelta64_dtype(obj.levels[1].dtype): raise ValueError( "Index not conform to audformat. " "Level 'start' must contain values of type 'timedelta64[ns]'." ) if not pd.api.types.is_timedelta64_dtype(obj.levels[2].dtype): raise ValueError( "Index not conform to audformat. " "Level 'end' must contain values of type 'timedelta64[ns]'." )
[docs]def assert_no_duplicates( obj: typing.Union[pd.Index, pd.Series, pd.DataFrame], ): r"""Assert object contains no duplicates in its index. The :ref:`table specifications <data-tables:Tables>` allow no duplicated index entries. To save time we do not test for this in :func:`audformat.assert_index`. Args: obj: object Raises: ValueError: if duplicates are found """ if isinstance(obj, (pd.Series, pd.DataFrame)): obj = obj.index if obj.has_duplicates: max_display = 10 duplicates = obj[obj.duplicated()] msg_tail = "\n..." if len(duplicates) > max_display else "" msg_duplicates = "\n".join( [str(duplicate) for duplicate in duplicates[:max_display].tolist()] ) raise ValueError( "Index not conform to audformat. " "Found duplicates:\n" f"{msg_duplicates}{msg_tail}" )
[docs]def filewise_index( files: Files = None, ) -> pd.Index: r"""Creates a filewise index. Index is conform to :ref:`table specifications <data-tables:Tables>`. Args: files: list of files Returns: filewise index Raises: ValueError: if created index contains duplicates Examples: >>> filewise_index(["a.wav", "b.wav"]) Index(['a.wav', 'b.wav'], dtype='string', name='file') """ if files is None: files = [] files = to_array(files) index = pd.Index( files, name=define.IndexField.FILE, dtype="string", ) assert_index(index) return index
[docs]def index_type( obj: typing.Union[pd.Index, pd.Series, pd.DataFrame], ) -> define.IndexType: r"""Derive index type. Possible return values are given by :class:`audformat.define.IndexType`. Args: obj: object conform to :ref:`table specifications <data-tables:Tables>` Returns: table type Raises: ValueError: if not conform to :ref:`table specifications <data-tables:Tables>` Examples: >>> index_type(filewise_index()) 'filewise' >>> index_type(segmented_index()) 'segmented' """ if isinstance(obj, (pd.Series, pd.DataFrame)): obj = obj.index assert_index(obj) if len(obj.names) == 1: return define.IndexType.FILEWISE else: return define.IndexType.SEGMENTED
[docs]def is_filewise_index( obj: typing.Union[pd.Index, pd.Series, pd.DataFrame], ) -> bool: r"""Check if object has a filewise index. Returns ``True`` if index is a filewise index conform to :ref:`table specifications <data-tables:Tables>`. Args: obj: object Returns: ``True`` if index is filewise, otherwise ``False`` Examples: >>> is_filewise_index(filewise_index()) True >>> is_filewise_index(pd.Index([])) False """ if not isinstance(obj, pd.Index): obj = obj.index return len(obj.names) == 1 and obj.names[0] == define.IndexField.FILE
[docs]def is_segmented_index( obj: typing.Union[pd.Index, pd.Series, pd.DataFrame], ) -> bool: r"""Check if object has a segmented index. Returns ``True`` if index is a segmented index conform to :ref:`table specifications <data-tables:Tables>`. Args: obj: object Returns: ``True`` if index is segmented, otherwise ``False`` Examples: >>> is_segmented_index(segmented_index()) True >>> is_segmented_index(pd.Index([])) False """ if not isinstance(obj, pd.Index): obj = obj.index return ( len(obj.names) == 3 and obj.names[0] == define.IndexField.FILE and obj.names[1] == define.IndexField.START and obj.names[2] == define.IndexField.END )
[docs]def segmented_index( files: Files = None, starts: Timestamps = None, ends: Timestamps = None, ) -> pd.Index: r"""Create segmented index. Index is conform to :ref:`table specifications <data-tables:Tables>`. If a non-empty index is created and ``starts`` is set to ``None``, the level will be filled up with ``0``. If a non-empty index is created and ``ends`` is set to ``None``, the level will be filled up with ``NaT``. Args: files: set confidence values only on a sub-set of files starts: segment start positions. Time values given as float or integers are treated as seconds ends: segment end positions. Time values given as float or integers are treated as seconds Returns: segmented index Raises: ValueError: if created index contains duplicates Raises: ValueError: if ``files``, ``start`` and ``ends`` differ in size Examples: >>> segmented_index("a.wav", 0, 1.1) MultiIndex([('a.wav', '0 days', '0 days 00:00:01.100000')], names=['file', 'start', 'end']) >>> segmented_index("a.wav", "0ms", "1ms") MultiIndex([('a.wav', '0 days', '0 days 00:00:00.001000')], names=['file', 'start', 'end']) >>> segmented_index(["a.wav", "b.wav"]) MultiIndex([('a.wav', '0 days', NaT), ('b.wav', '0 days', NaT)], names=['file', 'start', 'end']) >>> segmented_index(["a.wav", "b.wav"], [None, 1], [1, None]) MultiIndex([('a.wav', NaT, '0 days 00:00:01'), ('b.wav', '0 days 00:00:01', NaT)], names=['file', 'start', 'end']) >>> segmented_index( ... files=["a.wav", "a.wav"], ... starts=[0, 1], ... ends=pd.to_timedelta([1000, 2000], unit="ms"), ... ) MultiIndex([('a.wav', '0 days 00:00:00', '0 days 00:00:01'), ('a.wav', '0 days 00:00:01', '0 days 00:00:02')], names=['file', 'start', 'end']) """ files = to_array(files) starts = to_array(starts) ends = to_array(ends) if files is None: files = [] num_files = len(files) if starts is None: starts = [0] * num_files if ends is None: ends = [pd.NaT] * num_files if num_files != len(starts) or num_files != len(ends): raise ValueError( "Cannot create segmented table if 'files', " "'starts', and 'ends' differ in size", ) index = pd.MultiIndex.from_arrays( [files, to_timedelta(starts), to_timedelta(ends)], names=[ define.IndexField.FILE, define.IndexField.START, define.IndexField.END, ], ) index = utils.set_index_dtypes(index, {define.IndexField.FILE: "string"}) assert_index(index) return index