Source code for audformat.core.index

from __future__ import annotations

import numpy as np
import pandas as pd

from audformat.core import define
from audformat.core import utils
from audformat.core.typing import Files
from audformat.core.typing import Timestamps


def is_scalar(value: object) -> bool:
    r"""Check if value is scalar."""
    return (value is not None) and (
        isinstance(value, str) or not hasattr(value, "__len__")
    )


def to_array(value: object) -> list | np.ndarray:
    r"""Convert value to list or array."""
    if value is not None:
        if isinstance(value, (pd.Series, pd.DataFrame, pd.Index)):
            value = value.to_numpy()
        elif is_scalar(value):
            value = [value]
    return value


def to_timedelta(times):
    r"""Convert time value to pd.Timedelta."""
    try:
        return pd.to_timedelta(times, unit="s")
    except ValueError:  # catches values like '1s'
        return pd.to_timedelta(times)


[docs]def assert_index( obj: pd.Index | pd.Series | pd.DataFrame, ): r"""Assert object is conform to :ref:`table specifications <data-tables:Tables>`. This does not check for duplicates in the index. If you need that check use :func:`audformat.assert_no_duplicates` in addition. Args: obj: object Raises: ValueError: if not conform to :ref:`table specifications <data-tables:Tables>` """ # noqa: D205 if isinstance(obj, (pd.Series, pd.DataFrame)): obj = obj.index num = len(obj.names) if num != 1 and num != 3: raise ValueError( "Index not conform to audformat. " f"Found " f"{num} " f"levels, but expected 1 or 3 levels." ) if num == 1: if obj.names[0] != define.IndexField.FILE: raise ValueError( "Index not conform to audformat. " "Found single level with name " f"{obj.names[0]}, " f"but expected name " f"'{define.IndexField.FILE}'." ) if not pd.api.types.is_string_dtype(obj.dtype): raise ValueError( "Index not conform to audformat. " "Level 'file' must contain values of type 'string'." ) elif num == 3: if not ( obj.names[0] == define.IndexField.FILE and obj.names[1] == define.IndexField.START and obj.names[2] == define.IndexField.END ): expected_names = [ define.IndexField.FILE, define.IndexField.START, define.IndexField.END, ] raise ValueError( "Index not conform to audformat. " "Found three levels with names " f"{obj.names}, " f"but expected names " f"{expected_names}." ) if not pd.api.types.is_string_dtype(obj.levels[0].dtype): raise ValueError( "Index not conform to audformat. " "Level 'file' must contain values of type 'string'." ) if not pd.api.types.is_timedelta64_dtype(obj.levels[1].dtype): raise ValueError( "Index not conform to audformat. " "Level 'start' must contain values of type 'timedelta64[ns]'." ) if not pd.api.types.is_timedelta64_dtype(obj.levels[2].dtype): raise ValueError( "Index not conform to audformat. " "Level 'end' must contain values of type 'timedelta64[ns]'." )
[docs]def assert_no_duplicates( obj: pd.Index | pd.Series | pd.DataFrame, ): r"""Assert object contains no duplicates in its index. The :ref:`table specifications <data-tables:Tables>` allow no duplicated index entries. To save time we do not test for this in :func:`audformat.assert_index`. Args: obj: object Raises: ValueError: if duplicates are found """ if isinstance(obj, (pd.Series, pd.DataFrame)): obj = obj.index if obj.has_duplicates: max_display = 10 duplicates = obj[obj.duplicated()] msg_tail = "\n..." if len(duplicates) > max_display else "" msg_duplicates = "\n".join( [str(duplicate) for duplicate in duplicates[:max_display].tolist()] ) raise ValueError( "Index not conform to audformat. " "Found duplicates:\n" f"{msg_duplicates}{msg_tail}" )
[docs]def filewise_index( files: Files = None, ) -> pd.Index: r"""Creates a filewise index. Index is conform to :ref:`table specifications <data-tables:Tables>`. Args: files: list of files Returns: filewise index Raises: ValueError: if created index contains duplicates Examples: >>> filewise_index(["a.wav", "b.wav"]) Index(['a.wav', 'b.wav'], dtype='string', name='file') """ if files is None: files = [] files = to_array(files) index = pd.Index( files, name=define.IndexField.FILE, dtype="string", ) assert_index(index) return index
[docs]def index_type( obj: pd.Index | pd.Series | pd.DataFrame, ) -> define.IndexType: r"""Derive index type. Possible return values are given by :class:`audformat.define.IndexType`. Args: obj: object conform to :ref:`table specifications <data-tables:Tables>` Returns: table type Raises: ValueError: if not conform to :ref:`table specifications <data-tables:Tables>` Examples: >>> index_type(filewise_index()) 'filewise' >>> index_type(segmented_index()) 'segmented' """ if isinstance(obj, (pd.Series, pd.DataFrame)): obj = obj.index assert_index(obj) if len(obj.names) == 1: return define.IndexType.FILEWISE else: return define.IndexType.SEGMENTED
[docs]def is_filewise_index( obj: pd.Index | pd.Series | pd.DataFrame, ) -> bool: r"""Check if object has a filewise index. Returns ``True`` if index is a filewise index conform to :ref:`table specifications <data-tables:Tables>`. Args: obj: object Returns: ``True`` if index is filewise, otherwise ``False`` Examples: >>> is_filewise_index(filewise_index()) True >>> is_filewise_index(pd.Index([])) False """ if not isinstance(obj, pd.Index): obj = obj.index return len(obj.names) == 1 and obj.names[0] == define.IndexField.FILE
[docs]def is_segmented_index( obj: pd.Index | pd.Series | pd.DataFrame, ) -> bool: r"""Check if object has a segmented index. Returns ``True`` if index is a segmented index conform to :ref:`table specifications <data-tables:Tables>`. Args: obj: object Returns: ``True`` if index is segmented, otherwise ``False`` Examples: >>> is_segmented_index(segmented_index()) True >>> is_segmented_index(pd.Index([])) False """ if not isinstance(obj, pd.Index): obj = obj.index return ( len(obj.names) == 3 and obj.names[0] == define.IndexField.FILE and obj.names[1] == define.IndexField.START and obj.names[2] == define.IndexField.END )
[docs]def segmented_index( files: Files = None, starts: Timestamps = None, ends: Timestamps = None, ) -> pd.Index: r"""Create segmented index. Index is conform to :ref:`table specifications <data-tables:Tables>`. If a non-empty index is created and ``starts`` is set to ``None``, the level will be filled up with ``0``. If a non-empty index is created and ``ends`` is set to ``None``, the level will be filled up with ``NaT``. Args: files: set confidence values only on a sub-set of files starts: segment start positions. Time values given as float or integers are treated as seconds ends: segment end positions. Time values given as float or integers are treated as seconds Returns: segmented index Raises: ValueError: if created index contains duplicates Raises: ValueError: if ``files``, ``start`` and ``ends`` differ in size Examples: >>> segmented_index("a.wav", 0, 1.1) MultiIndex([('a.wav', '0 days', '0 days 00:00:01.100000')], names=['file', 'start', 'end']) >>> segmented_index("a.wav", "0ms", "1ms") MultiIndex([('a.wav', '0 days', '0 days 00:00:00.001000')], names=['file', 'start', 'end']) >>> segmented_index(["a.wav", "b.wav"]) MultiIndex([('a.wav', '0 days', NaT), ('b.wav', '0 days', NaT)], names=['file', 'start', 'end']) >>> segmented_index(["a.wav", "b.wav"], [None, 1], [1, None]) MultiIndex([('a.wav', NaT, '0 days 00:00:01'), ('b.wav', '0 days 00:00:01', NaT)], names=['file', 'start', 'end']) >>> segmented_index( ... files=["a.wav", "a.wav"], ... starts=[0, 1], ... ends=pd.to_timedelta([1000, 2000], unit="ms"), ... ) MultiIndex([('a.wav', '0 days 00:00:00', '0 days 00:00:01'), ('a.wav', '0 days 00:00:01', '0 days 00:00:02')], names=['file', 'start', 'end']) """ files = to_array(files) starts = to_array(starts) ends = to_array(ends) if files is None: files = [] num_files = len(files) if starts is None: starts = [0] * num_files if ends is None: ends = [pd.NaT] * num_files if num_files != len(starts) or num_files != len(ends): raise ValueError( "Cannot create segmented table if 'files', " "'starts', and 'ends' differ in size", ) index = pd.MultiIndex.from_arrays( [files, to_timedelta(starts), to_timedelta(ends)], names=[ define.IndexField.FILE, define.IndexField.START, define.IndexField.END, ], ) index = utils.set_index_dtypes(index, {define.IndexField.FILE: "string"}) assert_index(index) return index