from __future__ import annotations  # allow typing without string

import typing
import warnings

import numpy as np
import pandas as pd

from audformat.core import define
from audformat.core.common import HeaderBase
from audformat.core.common import to_audformat_dtype
from audformat.core.common import to_pandas_dtype
from audformat.core.index import index_type
from audformat.core.index import is_scalar
from audformat.core.index import to_array
from audformat.core.rater import Rater
from audformat.core.typing import Values

if typing.TYPE_CHECKING:
    # Fix to make mypy work without circular imports,
    # compare
    from audformat.core.scheme import Scheme  # pragma: nocover

def assert_values(
    values: Values,
    scheme: Scheme,
    r"""Raise error if values do not match scheme."""
    bad_values = []

    if (
        scheme.labels is not None
        or scheme.minimum is not None
        or scheme.maximum is not None
        if is_scalar(values):
            values = [values]
        elif isinstance(values, pd.Series):
            values = values.values
        elif isinstance(values, np.ndarray):
            if scheme.is_numeric:
                # Support type object with None entries,
                # which need to be converted to NaN
                # to support min/max
                values = values.astype(float)
                values = [
        # Get unique values and preserve order
        values = list(dict.fromkeys(values))
        bad_values = [value for value in values if value not in scheme]

    if len(bad_values) > 0:
        max_display = 10
        values = str(bad_values[:max_display])[1:-1]
        if len(bad_values) > max_display:
            values += ", ..."
        raise ValueError(
            f"Some value(s) do not match scheme\n{scheme}\n"
            f"with scheme ID '{scheme._id}':\n"

[docs]class Column(HeaderBase): r"""Table column. Represents a table column (see :class:`audformat.Table`) and optionally links it to a scheme (see :class:`audformat.Scheme`) and a rater (see :class:`audformat.Rater`). Args: scheme_id: scheme identifier (must exist) rater_id: rater identifier (must exist) description: table description meta: additional meta fields Examples: >>> Column(scheme_id="emotion") {scheme_id: emotion} """ def __init__( self, *, scheme_id: str = None, rater_id: str = None, description: str = None, meta: dict = None, ): super().__init__(description=description, meta=meta) self.scheme_id = scheme_id r"""Scheme identifier""" self.rater_id = rater_id r"""Rater identifier""" self._table = None self._id = None @property def rater(self) -> typing.Optional[Rater]: r"""Rater object. Returns: rater object or ``None`` if not available """ if ( (self.rater_id is not None) and (self.table is not None) and (self.table.db is not None) ): return self.table.db.raters[self.rater_id] @property def scheme(self) -> typing.Optional[Scheme]: r"""Scheme object. Returns: scheme object or ``None`` if not available """ if ( (self.scheme_id is not None) and (self.table is not None) and (self.table.db is not None) ): return self.table.db.schemes[self.scheme_id] @property def table(self): r"""Table object. Returns: table object or ``None`` if not assigned yet """ return self._table
[docs] def get( self, index: pd.Index = None, *, map: str = None, copy: bool = True, as_segmented: bool = False, allow_nat: bool = True, root: str = None, num_workers: typing.Optional[int] = 1, verbose: bool = False, ) -> pd.Series: r"""Get labels. By default, all labels of the column are returned, use ``index`` to get a subset. Examples are provided with the :ref:`table specifications <data-tables:Tables>`. Args: index: index conform to :ref:`table specifications <data-tables:Tables>` copy: return a copy of the labels map: :ref:`map scheme or scheme field to column values <map-scheme-labels>`. For example if your column holds speaker IDs and is assigned to a scheme that contains a dict mapping speaker IDs to age entries, ``map='age'`` will replace the ID values with the age of the speaker as_segmented: if set to ``True`` and column has a filewise index, the index of the returned column will be converted to a segmented index. ``start`` will be set to ``0`` and ``end`` to ``NaT`` or to the file duration if ``allow_nat`` is set to ``False``. If column belongs to a miscellaneous table, this and the following arguments have no effect allow_nat: if set to ``False``, ``end=NaT`` is replaced with file duration root: root directory under which the files are stored. Provide if file names are relative and database was not saved or loaded from disk. If ``None`` :attr:`audformat.Database.root` is used. Only relevant if ``allow_nat`` is set to ``False`` num_workers: number of parallel jobs. If ``None`` will be set to the number of processors on the machine multiplied by 5 verbose: show progress bar Returns: labels Raises: FileNotFoundError: if file is not found RuntimeError: if column is not assigned to a table ValueError: if trying to map without a scheme, or from a scheme that has no labels, or from a scheme that has only a list of labels, or to a non-existing field """ if self._table is None: raise RuntimeError("Column is not assigned to a table.") if hasattr(self._table, "type"): result = self._table.get( index, copy=False, as_segmented=as_segmented, allow_nat=allow_nat, root=root, num_workers=num_workers, verbose=verbose, ) else: result = self._table.get( index, copy=False, ) result = result[self._id] if map is not None: copy = False # to avoid another copy if self.scheme_id is None: raise ValueError(f"Column '{self._id}' is not assigned to a scheme.") scheme = self._table._db.schemes[self.scheme_id] labels = scheme._labels_to_dict() if labels is None: raise ValueError(f"Scheme '{self.scheme_id}' has no labels.") if not any(labels.values()): raise ValueError( f"Scheme '{self.scheme_id}' provides no mapping " "for its labels." ) # Check that at least one key is available for map # if labels are stored as dictionary keys = [] for key, value in labels.items(): if isinstance(value, dict): keys += list(value.keys()) keys = sorted(list(set(keys))) if len(keys) > 0 and map not in keys: raise ValueError( f"Cannot map " f"'{self._id}' " f"to " f"'{map}'. " f"Expected one of " f"{list(keys)}." ) mapping = {} for key, value in labels.items(): if isinstance(value, dict): if map in value: value = value[map] else: value = np.NaN mapping[key] = value result = = map if ( scheme.uses_table and self._table._db[scheme.labels][map].scheme is not None # ^ ^ # misc table column ): # Infer dtype from misc table misc_table_id = scheme.labels column = self._table._db[misc_table_id][map] dtype = column.scheme.to_pandas_dtype() else: # Infer dtype from actual labels dtype = pd.api.types.infer_dtype(list(result.values)) dtype = to_pandas_dtype(to_audformat_dtype(dtype)) result = result.astype(dtype) return result.copy() if copy else result
[docs] def set( self, values: Values, *, index: pd.Index = None, ): r"""Set labels. By default, all labels of the column are replaced, use ``index`` to set a subset. If columns is assigned to a :class:`Scheme` values will be automatically converted to match its dtype. Examples are provided with the :ref:`table specifications <data-tables:Tables>`. Args: values: list of values index: index conform to :ref:`table specifications <data-tables:Tables>` Raises: RuntimeError: if column is not assign to a table ValueError: if trying to set values of a filewise column using a segmented index ValueError: if values cannot be converted to match the schemes dtype """ if self._table is None: raise RuntimeError("Column is not assigned to a table.") column_id = self._id df = self._table.df if index is None: index = df.index if self.scheme_id is not None: scheme = self._table._db.schemes[self.scheme_id] assert_values(values, scheme) dtype = scheme.to_pandas_dtype() else: dtype = df[column_id].dtype if hasattr(self._table, "type") and self._table.type != index_type(index): # special case where a filewise / segmented table # is requested with an index of the other type if not self._table.is_filewise: files = index.get_level_values(define.IndexField.FILE) index = df.loc[files].index return self.set(values, index=index) else: raise ValueError( "Cannot set values of a filewise column " "using a segmented index." ) else: if is_scalar(values): values = [values] * len(index) values = to_array(values) if dtype == "datetime64[ns]": # Ensure all date values are timezone unaware, # see values = [ pd.to_datetime(value).tz_localize(None) if value is not None else value for value in values ] with warnings.catch_warnings(): # Avoid FutureWarning and DeprecationWarning # for pandas 1.5.0 to 1.5.3 # for setting values in place # as introduced at # # For pandas >=2.0.0 values are always set in place for warning in [FutureWarning, DeprecationWarning]: warnings.simplefilter(action="ignore", category=warning) df.loc[index, column_id] = pd.Series( values, index=index, dtype=dtype, )
[docs] def __eq__( self, other: "Column", ) -> bool: r"""Compare if column equals another column.""" if self.dump() != other.dump(): return False if self._table is not None and other._table is not None: return self._table.df[self._id].equals(other._table.df[other._id]) return self._table is None and other._table is None