from __future__ import annotations # allow typing without string
import copy
import os
import pickle
import typing
import pandas as pd
import audeer
from audformat.core import define
from audformat.core import utils
from audformat.core.column import Column
from audformat.core.common import HeaderBase
from audformat.core.common import HeaderDict
from audformat.core.common import to_audformat_dtype
from audformat.core.common import to_pandas_dtype
from audformat.core.errors import BadIdError
from audformat.core.index import filewise_index
from audformat.core.index import index_type
from audformat.core.index import is_filewise_index
from audformat.core.index import is_segmented_index
from audformat.core.media import Media
from audformat.core.split import Split
from audformat.core.typing import Values
class Base(HeaderBase):
r"""Table base class."""
def __init__(
self,
index: pd.Index = None,
*,
split_id: str = None,
media_id: str = None,
description: str = None,
meta: dict = None,
):
super().__init__(description=description, meta=meta)
self.split_id = split_id
r"""Split ID"""
self.media_id = media_id
r"""Media ID"""
self.columns = HeaderDict(
sort_by_key=False,
value_type=Column,
set_callback=self._set_column,
)
r"""Table columns"""
self._df = pd.DataFrame(index=index)
self._db = None
self._id = None
def __add__(self, other: typing.Self) -> typing.Self:
r"""Create new table by combining two tables.
The new :ref:`combined table <combine-tables>`
contains index and columns of both tables.
Missing values will be set to ``NaN``.
If table is conform to
:ref:`table specifications <data-tables:Tables>`
and at least one table is segmented,
the output has a segmented index.
Columns with the same identifier are combined to a single column.
This requires that:
1. both columns have the same dtype
2. in places where the indices overlap the values of both columns
match or one column contains ``NaN``
Media and split information,
as well as,
references to schemes and raters are discarded.
If you intend to keep them,
use ``update()``.
Args:
other: the other table
Raises:
ValueError: if columns with the same name have different dtypes
ValueError: if values in the same position do not match
ValueError: if level and dtypes of indices do not match
"""
df = utils.concat([self.df, other.df])
table = self.__new__(type(self))
table.__init__(df.index)
for column_id in df:
table[column_id] = Column()
table._df = df
return table
def __getitem__(self, column_id: str) -> Column:
r"""Return view to a column.
Args:
column_id: column identifier
"""
return self.columns[column_id]
def __eq__(
self,
other: Base,
) -> bool:
r"""Compare if table equals other table."""
if self.dump() != other.dump():
return False
return self.df.equals(other.df)
def __len__(self) -> int:
r"""Number of rows in table."""
return len(self.df)
def __setitem__(self, column_id: str, column: Column) -> Column:
r"""Add new column to table.
Args:
column_id: column identifier
column: column
Raises:
BadIdError: if a column with a ``scheme_id`` or ``rater_id`` is
added that does not exist
ValueError: if column ID is not different from level names
ValueError: if the column is linked to a scheme
that is using labels from a misc table,
but the misc table the column is assigned to
is already used by the same or another scheme
"""
if (
column.scheme_id is not None
and self.db is not None
and column.scheme_id in self.db.schemes
):
# check if scheme uses
# labels from a table
scheme = self.db.schemes[column.scheme_id]
if scheme.uses_table:
# check if scheme uses
# labels from this table
if self._id == scheme.labels:
raise ValueError(
f"Scheme "
f"'{column.scheme_id}' "
f"uses misc table "
f"'{self._id}' "
f"as labels and cannot be used "
f"with columns of the same table."
)
# check if this table
# is already used with a scheme
for scheme_id in self.db.schemes:
if self._id == self.db.schemes[scheme_id].labels:
raise ValueError(
f"Since the misc table "
f"'{self._id}' "
f"is used as labels in scheme "
f"'{scheme_id}' "
f"its columns cannot be used with a scheme "
f"that also uses labels from a misc table."
)
self.columns[column_id] = column
return column
@property
def db(self):
r"""Database object.
Returns:
database object or ``None`` if not assigned yet
"""
return self._db
@property
def df(self) -> pd.DataFrame:
r"""Table data.
Returns:
data
"""
if self._df is None:
# if database was loaded with 'load_data=False'
# we have to load the table data now
path = os.path.join(self.db.root, f"{self.db._name}.{self._id}")
self.load(path)
return self._df
@property
def index(self) -> pd.Index:
r"""Table index.
Returns:
index
"""
return self.df.index
@property
def media(self) -> typing.Optional[Media]:
r"""Media object.
Returns:
media object or ``None`` if not available
"""
if self.media_id is not None and self.db is not None:
return self.db.media[self.media_id]
@property
def split(self) -> typing.Optional[Split]:
r"""Split object.
Returns:
split object or ``None`` if not available
"""
if self.split_id is not None and self.db is not None:
return self.db.splits[self.split_id]
def copy(self) -> typing.Self:
r"""Copy table.
Return:
new table object
"""
table = self.__class__(
self.df.index,
media_id=self.media_id,
split_id=self.split_id,
)
table._db = self.db
for column_id, column in self.columns.items():
table.columns[column_id] = Column(
scheme_id=column.scheme_id,
rater_id=column.rater_id,
description=column.description,
meta=column.meta.copy(),
)
table._df = self.df.copy()
return table
def drop_columns(
self,
column_ids: typing.Union[str, typing.Sequence[str]],
*,
inplace: bool = False,
) -> typing.Self:
r"""Drop columns by ID.
Args:
column_ids: column IDs
inplace: drop columns in place
Returns:
new object if ``inplace=False``, otherwise ``self``
"""
if not inplace:
return self.copy().drop_columns(column_ids, inplace=True)
if isinstance(column_ids, str):
column_ids = [column_ids]
column_ids_ = set()
for column_id in column_ids:
column_ids_.add(column_id)
self.df.drop(column_ids_, inplace=True, axis="columns")
for column_id in column_ids_:
self.columns.pop(column_id)
return self
def drop_index(
self,
index: pd.Index,
*,
inplace: bool = False,
) -> typing.Self:
r"""Drop rows from index.
Args:
index: index object
inplace: drop index in place
Returns:
new object if ``inplace=False``, otherwise ``self``
Raises:
ValueError: if level and dtypes of index does not match table index
"""
table = self if inplace else self.copy()
index = _maybe_convert_dtype_to_string(index)
_assert_table_index(table, index, "drop rows from")
index = utils.intersect([table.index, index])
new_index = utils.difference([table.index, index])
table._df = table.df.reindex(new_index)
if inplace:
_maybe_update_scheme(table)
return table
def extend_index(
self,
index: pd.Index,
*,
fill_values: typing.Union[typing.Any, typing.Dict[str, typing.Any]] = None,
inplace: bool = False,
) -> typing.Self:
r"""Extend table with new rows.
Args:
index: index object
fill_values: replace NaN with these values (either a scalar
applied to all columns or a dictionary with column name as
key)
inplace: extend index in place
Returns:
new object if ``inplace=False``, otherwise ``self``
Raises:
ValueError: if level and dtypes of index does not match table index
"""
table = self if inplace else self.copy()
index = _maybe_convert_dtype_to_string(index)
_assert_table_index(table, index, "extend")
new_index = utils.union([table.index, index])
table._df = table.df.reindex(new_index)
if fill_values is not None:
if isinstance(fill_values, dict):
for key, value in fill_values.items():
table.df.fillna({key: value}, inplace=True)
else:
table.df.fillna(fill_values, inplace=True)
if inplace:
_maybe_update_scheme(table)
return table
def get(
self,
index: pd.Index = None,
*,
map: typing.Dict[str, typing.Union[str, typing.Sequence[str]]] = None,
copy: bool = True,
) -> pd.DataFrame:
r"""Get labels.
By default, all labels of the table are returned,
use ``index`` to get a subset.
Examples are provided with the
:ref:`table specifications <data-tables:Tables>`,
and for ``map`` in :ref:`map-scheme-labels`.
Args:
index: index
copy: return a copy of the labels
map: map scheme or scheme fields to column values.
For example if your table holds a column ``speaker`` with
speaker IDs, which is assigned to a scheme that contains a
dict mapping speaker IDs to age and gender entries,
``map={'speaker': ['age', 'gender']}``
will replace the column with two new columns that map ID
values to age and gender, respectively.
To also keep the original column with speaker IDS, you can do
``map={'speaker': ['speaker', 'age', 'gender']}``
Returns:
labels
Raises:
FileNotFoundError: if file is not found
RuntimeError: if table is not assign to a database
ValueError: if trying to map without a scheme
ValueError: if trying to map from a scheme that has no labels
ValueError: if trying to map to a non-existing field
"""
result_is_copy = False
if index is None:
result = self.df
else:
result = self._get_by_index(index)
if map is not None:
if self.db is None:
raise RuntimeError(
"Cannot map schemes, " "table is not assigned to a database."
)
if not result_is_copy:
result = result.copy()
result_is_copy = True # to avoid another copy
for column, mapped_columns in map.items():
mapped_columns = audeer.to_list(mapped_columns)
if len(mapped_columns) == 1:
result[mapped_columns[0]] = self.columns[column].get(
index,
map=mapped_columns[0],
)
else:
for mapped_column in mapped_columns:
if mapped_column != column:
result[mapped_column] = self.columns[column].get(
index,
map=mapped_column,
)
if column not in mapped_columns:
result.drop(columns=column, inplace=True)
return result.copy() if (copy and not result_is_copy) else result
def load(
self,
path: str,
):
r"""Load table data from disk.
Tables can be stored as PKL and/or CSV files to disk.
If both files are present
it will load the PKL file
as long as its modification date is newer,
otherwise it will raise an error
and ask to delete one of the files.
Args:
path: file path without extension
Raises:
RuntimeError: if table file(s) are missing
RuntimeError: if CSV file is newer than PKL file
"""
path = audeer.path(path)
pkl_file = f"{path}.{define.TableStorageFormat.PICKLE}"
csv_file = f"{path}.{define.TableStorageFormat.CSV}"
if not os.path.exists(pkl_file) and not os.path.exists(csv_file):
raise RuntimeError(
f"No file found for table with path '{path}.{{pkl|csv}}'"
)
# Load from PKL if file exists and is newer then CSV file.
# If both are written by Database.save() this is the case
# as it stores first the PKL file
pickled = False
if os.path.exists(pkl_file):
if os.path.exists(csv_file) and os.path.getmtime(
csv_file
) > os.path.getmtime(pkl_file):
raise RuntimeError(
f"The table CSV file '{csv_file}' is newer "
f"than the table PKL file '{pkl_file}'. "
"If you want to load from the CSV file, "
"please delete the PKL file. "
"If you want to load from the PKL file, "
"please delete the CSV file."
)
pickled = True
if pickled:
try:
self._load_pickled(pkl_file)
except (AttributeError, ValueError, EOFError) as ex:
# if exception is raised (e.g. unsupported pickle protocol)
# try to load from CSV and save it again
# otherwise raise error
if os.path.exists(csv_file):
self._load_csv(csv_file)
self._save_pickled(pkl_file)
else:
raise ex
else:
self._load_csv(csv_file)
def pick_columns(
self,
column_ids: typing.Union[str, typing.Sequence[str]],
*,
inplace: bool = False,
) -> typing.Self:
r"""Pick columns by ID.
All other columns will be dropped.
Args:
column_ids: column IDs
inplace: pick columns in place
Returns:
new object if ``inplace=False``, otherwise ``self``
"""
if isinstance(column_ids, str):
column_ids = [column_ids]
drop_ids = set()
for column_id in list(self.columns):
if column_id not in column_ids:
drop_ids.add(column_id)
return self.drop_columns(list(drop_ids), inplace=inplace)
def pick_index(
self,
index: pd.Index,
*,
inplace: bool = False,
) -> typing.Self:
r"""Pick rows from index.
Args:
index: index object
inplace: pick index in place
Returns:
new object if ``inplace=False``, otherwise ``self``
Raises:
ValueError: if level and dtypes of index does not match table index
"""
table = self if inplace else self.copy()
index = _maybe_convert_dtype_to_string(index)
_assert_table_index(table, index, "pick rows from")
new_index = utils.intersect([table.index, index])
table._df = table.df.reindex(new_index)
if inplace:
_maybe_update_scheme(table)
return table
def save(
self,
path: str,
*,
storage_format: str = define.TableStorageFormat.CSV,
update_other_formats: bool = True,
):
r"""Save table data to disk.
Existing files will be overwritten.
Args:
path: file path without extension
storage_format: storage format of table.
See :class:`audformat.define.TableStorageFormat`
for available formats
update_other_formats: if ``True`` it will not only save
to the given ``storage_format``,
but update all files stored in other storage formats as well
"""
path = audeer.path(path)
define.TableStorageFormat._assert_has_attribute_value(storage_format)
pickle_file = path + f".{define.TableStorageFormat.PICKLE}"
csv_file = path + f".{define.TableStorageFormat.CSV}"
# Make sure the CSV file is always written first
# as it is expected to be older by load()
if storage_format == define.TableStorageFormat.PICKLE:
if update_other_formats and os.path.exists(csv_file):
self._save_csv(csv_file)
self._save_pickled(pickle_file)
if storage_format == define.TableStorageFormat.CSV:
self._save_csv(csv_file)
if update_other_formats and os.path.exists(pickle_file):
self._save_pickled(pickle_file)
def set(
self,
values: typing.Union[
typing.Dict[str, Values],
pd.DataFrame,
],
*,
index: pd.Index = None,
):
r"""Set labels.
By default, all labels of the table are replaced,
use ``index`` to select a subset.
If a column is assigned to a :class:`Scheme`
values will be automatically converted
to match its dtype.
Examples are provided with the
:ref:`table specifications <data-tables:Tables>`.
Args:
values: dictionary of values with ``column_id`` as key
index: index
Raises:
ValueError: if values cannot be converted
to match the schemes dtype
"""
for idx, data in values.items():
self.columns[idx].set(data, index=index)
def update(
self,
others: typing.Union[typing.Self, typing.Sequence[typing.Self]],
*,
overwrite: bool = False,
) -> typing.Self:
r"""Update table with other table(s).
Table which calls ``update()``
to :ref:`combine tables <combine-tables>`
must be assigned to a database.
For all tables media and split must match.
Columns that are not yet part of the table will be added and
referenced schemes or raters are copied.
For overlapping columns, schemes and raters must match.
Columns with the same identifier are combined to a single column.
This requires that both columns have the same dtype
and if ``overwrite`` is set to ``False``,
values in places where the indices overlap have to match
or one column contains ``NaN``.
If ``overwrite`` is set to ``True``,
the value of the last table in the list is kept.
The index type of the table must not change.
Args:
others: table object(s)
overwrite: overwrite values where indices overlap
Returns:
the updated table
Raises:
RuntimeError: if table is not assign to a database
ValueError: if split or media does not match
ValueError: if overlapping columns reference different schemes
or raters
ValueError: if a missing scheme or rater cannot be copied
because a different object with the same ID exists
ValueError: if values in same position overlap
ValueError: if level and dtypes of table indices do not match
"""
if self.db is None:
raise RuntimeError("Table is not assigned to a database.")
others = audeer.to_list(others)
for other in others:
_assert_table_index(self, other.index, "update")
def raise_error(
msg,
left: typing.Optional[HeaderDict],
right: typing.Optional[HeaderDict],
):
raise ValueError(f"{msg}:\n" f"{left}\n" "!=\n" f"{right}")
def assert_equal(
msg: str,
left: typing.Optional[HeaderDict],
right: typing.Optional[HeaderDict],
):
equal = True
if left and right:
equal = left == right
elif left or right:
equal = False
if not equal:
raise_error(msg, left, right)
missing_schemes = {}
missing_raters = {}
for other in others:
assert_equal(
"Media of table " f"'{other._id}' " "does not match",
self.media,
other.media,
)
assert_equal(
"Split of table " f"'{other._id}' " "does not match",
self.split,
other.split,
)
# assert schemes match for overlapping columns and
# look for missing schemes in new columns,
# raise an error if a different scheme with same ID exists
for column_id, column in other.columns.items():
if column_id in self.columns:
assert_equal(
"Scheme of common column "
f"'{other._id}.{column_id}' "
"does not match",
self.columns[column_id].scheme,
column.scheme,
)
else:
if column.scheme is not None:
if column.scheme_id in self.db.schemes:
assert_equal(
"Cannot copy scheme of column "
f"'{other._id}.{column_id}' "
"as a different scheme with ID "
f"'{column.scheme_id}' "
"exists",
self.db.schemes[column.scheme_id],
column.scheme,
)
else:
missing_schemes[column.scheme_id] = column.scheme
# assert raters match for overlapping columns and
# look for missing raters in new columns,
# raise an error if a different rater with same ID exists
for column_id, column in other.columns.items():
if column_id in self.columns:
assert_equal(
f"self['{self._id}']['{column_id}'].rater "
"does not match "
f"other['{other._id}']['{column_id}'].rater",
self.columns[column_id].rater,
column.rater,
)
else:
if column.rater is not None:
if column.rater_id in self.db.raters:
assert_equal(
f"db1.raters['{column.scheme_id}'] "
"does not match "
f"db2.raters['{column.scheme_id}']",
self.db.raters[column.rater_id],
column.rater,
)
else:
missing_raters[column.rater_id] = column.rater
# concatenate table data
df = utils.concat(
[self.df] + [other.df for other in others],
overwrite=overwrite,
)
# insert missing schemes and raters
for scheme_id, scheme in missing_schemes.items():
self.db.schemes[scheme_id] = copy.copy(scheme)
for rater_id, rater in missing_raters.items():
self.db.raters[rater_id] = copy.copy(rater)
# insert new columns
for other in others:
for column_id, column in other.columns.items():
if column_id not in self.columns:
self.columns[column_id] = copy.copy(column)
# update table data
self._df = df
return self
def _get_by_index(
self,
index: pd.Index,
) -> (pd.DataFrame, bool): # pragma: no cover
# Executed when calling `self.get(index=index)`.
# Returns `df, df_is_copy`
raise NotImplementedError()
def _load_csv(self, path: str):
schemes = self.db.schemes
converters = {}
dtypes = {}
if hasattr(self, "type"):
# filewise or segmented table
dtypes[define.IndexField.FILE] = define.DataType.STRING
if self.type == define.IndexType.SEGMENTED:
dtypes[define.IndexField.START] = define.DataType.TIME
dtypes[define.IndexField.END] = define.DataType.TIME
else:
# misc table
dtypes = self.levels
# index columns
levels = list(dtypes)
dtypes = {level: to_pandas_dtype(dtype) for level, dtype in dtypes.items()}
# other columns
columns = list(self.columns)
for column_id, column in self.columns.items():
if column.scheme_id is not None:
dtypes[column_id] = schemes[column.scheme_id].to_pandas_dtype()
else:
dtypes[column_id] = "object"
# replace dtype with converter for dates or timestamps
dtypes_wo_converters = {}
for column_id, dtype in dtypes.items():
if dtype == "datetime64[ns]":
converters[column_id] = lambda x: pd.to_datetime(x)
elif dtype == "timedelta64[ns]":
converters[column_id] = lambda x: pd.to_timedelta(x)
else:
dtypes_wo_converters[column_id] = dtype
# read csv
df = pd.read_csv(
path,
usecols=levels + columns,
dtype=dtypes_wo_converters,
index_col=levels,
converters=converters,
float_precision="round_trip",
)
# For an empty CSV file
# converters will not set the correct dtype
# and we need to correct it manually
if len(df) == 0:
# fix index
converter_dtypes = {
level: dtype
for level, dtype in dtypes.items()
if level in converters and level in levels
}
df.index = utils.set_index_dtypes(df.index, converter_dtypes)
# fix columns
for column_id in columns:
if column_id in converters:
dtype = dtypes[column_id]
df[column_id] = df[column_id].astype(dtype)
self._df = df
def _load_pickled(self, path: str):
# Older versions of audformat used xz compression
# which produced smaller files,
# but was slower.
# The try-except statement allows backward compatibility
try:
df = pd.read_pickle(path)
except pickle.UnpicklingError:
df = pd.read_pickle(path, compression="xz")
# Older versions of audformat stored columns
# assigned to a string scheme as 'object',
# so we need to convert those to 'string'
for column_id, column in self.columns.items():
if (
column.scheme_id is not None
and (self.db.schemes[column.scheme_id].dtype == define.DataType.STRING)
and df[column_id].dtype == "object"
):
df[column_id] = df[column_id].astype("string", copy=False)
# Fix index entries as well
df.index = _maybe_convert_dtype_to_string(df.index)
self._df = df
def _save_csv(self, path: str):
# Load table before opening CSV file
# to avoid creating a CSV file
# that is newer than the PKL file
df = self.df
with open(path, "w") as fp:
df.to_csv(fp, encoding="utf-8")
def _save_pickled(self, path: str):
self.df.to_pickle(
path,
protocol=4, # supported by Python >= 3.4
)
def _set_column(self, column_id: str, column: Column) -> Column:
levels = (
self.index.names
if isinstance(self.index, pd.MultiIndex)
else [self.index.name]
)
if column_id in levels:
raise ValueError(
f"Cannot add column with ID "
f"'{column_id}' "
f"when there is an "
f"index level with same name. "
f"Level names are: "
f"{levels}."
)
if column.scheme_id is not None and column.scheme_id not in self.db.schemes:
raise BadIdError("column", column.scheme_id, self.db.schemes)
if column.rater_id is not None and column.rater_id not in self.db.raters:
raise BadIdError("rater", column.rater_id, self.db.raters)
if column.scheme_id is not None:
dtype = self.db.schemes[column.scheme_id].to_pandas_dtype()
else:
dtype = object
self.df[column_id] = pd.Series(dtype=dtype)
column._id = column_id
column._table = self
return column
[docs]class MiscTable(Base):
r"""Miscellaneous table.
.. note:: Intended for use with tables
that have an index that is not conform to
:ref:`table specifications <data-tables:Tables>`.
Otherwise, use :class:`audformat.Table`.
To fill a table with labels,
add one or more :class:`audformat.Column`
and use :meth:`audformat.MiscTable.set` to set the values.
When adding a column,
the column ID must be different
from the index level names.
When initialized with a single-level
:class:`pandas.MultiIndex`,
the index will be converted to a
:class:`pandas.Index`.
Args:
index: table index with non-empty and unique level names
split_id: split identifier (must exist)
media_id: media identifier (must exist)
description: database description
meta: additional meta fields
Raises:
ValueError: if level names of index are empty or not unique
Examples:
>>> index = pd.MultiIndex.from_tuples(
... [
... ("f1", "f2"),
... ("f1", "f3"),
... ("f2", "f3"),
... ],
... names=["file", "other"],
... )
>>> index = utils.set_index_dtypes(index, "string")
>>> table = MiscTable(
... index,
... split_id=define.SplitType.TEST,
... )
>>> table["match"] = Column()
>>> table
levels: {file: str, other: str}
split_id: test
columns:
match: {}
>>> table.get()
match
file other
f1 f2 NaN
f3 NaN
f2 f3 NaN
>>> table.set({"match": [True, False, True]})
>>> table.get()
match
file other
f1 f2 True
f3 False
f2 f3 True
>>> table.get(index[:2])
match
file other
f1 f2 True
f3 False
>>> index_new = pd.MultiIndex.from_tuples(
... [
... ("f4", "f1"),
... ],
... names=["file", "other"],
... )
>>> index_new = utils.set_index_dtypes(index_new, "string")
>>> table_ex = table.extend_index(
... index_new,
... inplace=False,
... )
>>> table_ex.get()
match
file other
f1 f2 True
f3 False
f2 f3 True
f4 f1 NaN
>>> table_ex.set(
... {"match": True},
... index=index_new,
... )
>>> table_ex.get()
match
file other
f1 f2 True
f3 False
f2 f3 True
f4 f1 True
>>> table_str = MiscTable(index)
>>> table_str["strings"] = Column()
>>> table_str.set({"strings": ["a", "b", "c"]})
>>> (table + table_str).get()
match strings
file other
f1 f2 True a
f3 False b
f2 f3 True c
>>> (table_ex + table_str).get()
match strings
file other
f1 f2 True a
f3 False b
f2 f3 True c
f4 f1 True NaN
"""
def __init__(
self,
index: pd.Index,
*,
split_id: str = None,
media_id: str = None,
description: str = None,
meta: dict = None,
):
self.levels = None
r"""Index levels."""
if index is not None:
# convert single-level pd.MultiIndex to pd.Index
if isinstance(index, pd.MultiIndex) and index.nlevels == 1:
index = index.get_level_values(0)
# Ensure integers are always stored as Int64
index = utils._maybe_convert_int_dtype(index)
levels = utils._levels(index)
if not all(levels) or len(levels) > len(set(levels)):
raise ValueError(
f"Got index with levels "
f"{levels}, "
f"but names must be non-empty and unique."
)
dtypes = [to_audformat_dtype(dtype) for dtype in utils._dtypes(index)]
self.levels = {level: dtype for level, dtype in zip(levels, dtypes)}
super().__init__(
index,
split_id=split_id,
media_id=media_id,
description=description,
meta=meta,
)
def _get_by_index(self, index: pd.Index) -> pd.DataFrame:
return self.df.loc[index]
[docs]class Table(Base):
r"""Table conform to :ref:`table specifications <data-tables:Tables>`.
Consists of a list of file names to which it assigns
numerical values or labels.
To fill a table with labels,
add one or more :class:`audformat.Column`
and use :meth:`audformat.Table.set` to set the values.
When adding a column,
the column ID must be different
from the index level names,
which are ``'file'``
in case of a ``filewise`` table
and ``'file'``, ``'start'`` and ``'end'``
in case of ``segmented`` table.
Args:
index: index conform to
:ref:`table specifications <data-tables:Tables>`.
If ``None`` creates an empty filewise table
split_id: split identifier (must exist)
media_id: media identifier (must exist)
description: database description
meta: additional meta fields
Raises:
ValueError: if index not conform to
:ref:`table specifications <data-tables:Tables>`
Examples:
>>> index = filewise_index(["f1", "f2", "f3"])
>>> table = Table(
... index,
... split_id=define.SplitType.TEST,
... )
>>> table["values"] = Column()
>>> table
type: filewise
split_id: test
columns:
values: {}
>>> table.get()
values
file
f1 NaN
f2 NaN
f3 NaN
>>> table.set({"values": [0, 1, 2]})
>>> table.get()
values
file
f1 0
f2 1
f3 2
>>> table.get(index[:2])
values
file
f1 0
f2 1
>>> table.get(as_segmented=True)
values
file start end
f1 0 days NaT 0
f2 0 days NaT 1
f3 0 days NaT 2
>>> index_new = filewise_index("f4")
>>> table_ex = table.extend_index(
... index_new,
... inplace=False,
... )
>>> table_ex.get()
values
file
f1 0
f2 1
f3 2
f4 NaN
>>> table_ex.set(
... {"values": 3},
... index=index_new,
... )
>>> table_ex.get()
values
file
f1 0
f2 1
f3 2
f4 3
>>> table_str = Table(index)
>>> table_str["strings"] = Column()
>>> table_str.set({"strings": ["a", "b", "c"]})
>>> (table + table_str).get()
values strings
file
f1 0 a
f2 1 b
f3 2 c
>>> (table_ex + table_str).get()
values strings
file
f1 0 a
f2 1 b
f3 2 c
f4 3 NaN
"""
def __init__(
self,
index: pd.Index = None,
*,
split_id: str = None,
media_id: str = None,
description: str = None,
meta: dict = None,
):
if index is None:
index = filewise_index()
index = _maybe_convert_dtype_to_string(index)
self.type = index_type(index)
r"""Table type
See :class:`audformat.define.IndexType`
for possible values.
"""
super().__init__(
index,
split_id=split_id,
media_id=media_id,
description=description,
meta=meta,
)
@property
def ends(self) -> pd.Index:
r"""Segment end times.
Returns:
timestamps
"""
if self.is_segmented:
return self.df.index.get_level_values(define.IndexField.END)
else:
return utils.to_segmented_index(self.df.index).get_level_values(
define.IndexField.END
)
@property
def files(self) -> pd.Index:
r"""Files referenced in the table.
Returns:
files
"""
# We use len() here as self.df.index.empty takes a very long time
if len(self.df.index) == 0:
return filewise_index()
else:
index = self.df.index.get_level_values(define.IndexField.FILE)
index.name = define.IndexField.FILE
return index
@property
def is_filewise(self) -> bool:
r"""Check if filewise table.
Returns:
``True`` if filewise table.
"""
return self.type == define.IndexType.FILEWISE
@property
def is_segmented(self) -> bool:
r"""Check if segmented table.
Returns:
``True`` if segmented table.
"""
return self.type == define.IndexType.SEGMENTED
@property
def starts(self) -> pd.Index:
r"""Segment start times.
Returns:
timestamps
"""
if self.is_segmented:
return self.df.index.get_level_values(define.IndexField.START)
else:
return utils.to_segmented_index(self.df.index).get_level_values(
define.IndexField.START
)
[docs] def drop_files(
self,
files: typing.Union[
str,
typing.Sequence[str],
typing.Callable[[str], bool],
],
*,
inplace: bool = False,
) -> Table:
r"""Drop files.
Remove rows with a reference to listed or matching files.
Args:
files: list of files or condition function
inplace: drop files in place
Returns:
new object if ``inplace=False``, otherwise ``self``
"""
if not inplace:
return self.copy().drop_files(files, inplace=True)
if isinstance(files, str):
files = [files]
if callable(files):
sel = self.files.to_series().apply(files)
self._df = self.df[~sel.values]
else:
index = self.files.intersection(files)
index.name = define.IndexField.FILE
if self.is_segmented:
level = "file"
else:
level = None
self.df.drop(index, inplace=True, level=level)
return self
[docs] def get(
self,
index: pd.Index = None,
*,
map: typing.Dict[str, typing.Union[str, typing.Sequence[str]]] = None,
copy: bool = True,
as_segmented: bool = False,
allow_nat: bool = True,
root: str = None,
num_workers: typing.Optional[int] = 1,
verbose: bool = False,
) -> pd.DataFrame:
r"""Get labels.
By default, all labels of the table are returned,
use ``index`` to get a subset.
Examples are provided with the
:ref:`table specifications <data-tables:Tables>`.
Args:
index: index conform to
:ref:`table specifications <data-tables:Tables>`
copy: return a copy of the labels
map: :ref:`map scheme or scheme fields to column values
<map-scheme-labels>`.
For example if your table holds a column ``speaker`` with
speaker IDs, which is assigned to a scheme that contains a
dict mapping speaker IDs to age and gender entries,
``map={'speaker': ['age', 'gender']}``
will replace the column with two new columns that map ID
values to age and gender, respectively.
To also keep the original column with speaker IDS, you can do
``map={'speaker': ['speaker', 'age', 'gender']}``
as_segmented: if set to ``True``
and table has a filewise index,
the index of the returned table
will be converted to a segmented index.
``start`` will be set to ``0`` and
``end`` to ``NaT`` or to the file duration
if ``allow_nat`` is set to ``False``
allow_nat: if set to ``False``,
``end=NaT`` is replaced with file duration
root: root directory under which the files are stored.
Provide if file names are relative and
database was not saved or loaded from disk.
If ``None`` :attr:`audformat.Database.root` is used.
Only relevant if ``allow_nat`` is set to ``False``
num_workers: number of parallel jobs.
If ``None`` will be set to the number of processors
on the machine multiplied by 5
verbose: show progress bar
Returns:
labels
Raises:
FileNotFoundError: if file is not found
RuntimeError: if table is not assign to a database
ValueError: if trying to map without a scheme
ValueError: if trying to map from a scheme that has no labels
ValueError: if trying to map to a non-existing field
"""
result = super().get(index, map=map, copy=copy)
# if necessary, convert to segmented index and replace NaT
is_segmented = is_segmented_index(result.index)
if (not is_segmented and as_segmented) or (is_segmented and not allow_nat):
files_duration = None
if self.db is not None:
files_duration = self.db._files_duration
root = root or self.db.root
new_index = utils.to_segmented_index(
result.index,
allow_nat=allow_nat,
files_duration=files_duration,
root=root,
num_workers=num_workers,
verbose=verbose,
)
result = result.set_axis(new_index)
return result
[docs] def map_files(
self,
func: typing.Callable[[str], str],
):
r"""Apply function to file names in table.
If speed is crucial,
see :func:`audformat.utils.map_file_path`
for further hints how to optimize your code.
Args:
func: map function
"""
self.df.index = utils.map_file_path(self.df.index, func)
[docs] def pick_files(
self,
files: typing.Union[
str,
typing.Sequence[str],
typing.Callable[[str], bool],
],
*,
inplace: bool = False,
) -> Table:
r"""Pick files.
Keep only rows with a reference to listed files or matching files.
Args:
files: list of files or condition function
inplace: pick files in place
Returns:
new object if ``inplace=False``, otherwise ``self``
"""
if not inplace:
return self.copy().pick_files(files, inplace=True)
if isinstance(files, str):
files = [files]
if callable(files):
sel = self.files.to_series().apply(files)
self._df = self.df[sel.values]
else:
index = self.files.intersection(files)
index.name = define.IndexField.FILE
self._df = self.get(index, copy=False)
return self
def _get_by_index(
self,
index: pd.Index,
) -> pd.DataFrame:
if index_type(self.index) == index_type(index):
result = self.df.loc[index]
else:
files = index.get_level_values(define.IndexField.FILE)
if self.is_filewise: # index is segmented
result = self.df.loc[files]
result.index = index
else: # index is filewise
files = list(dict.fromkeys(files)) # remove duplicates
result = self.df.loc[files]
return result
def _assert_table_index(
table: Base,
index: pd.Index,
operation: str,
):
r"""Raise error if index does not match table."""
if isinstance(table, Table):
input_type = index_type(index)
if table.type != input_type:
raise ValueError(
f"Cannot "
f"{operation} "
f"a "
f"{table.type} "
f"table with a "
f"{input_type} "
f"index."
)
elif not utils.is_index_alike([table.index, index]):
want = (
index.dtypes
if isinstance(index, pd.MultiIndex)
else pd.Series(index.dtype, pd.Index([index.name]))
)
want = "\n\t".join(want.to_string().split("\n"))
got = (
table.index.dtypes
if isinstance(table.index, pd.MultiIndex)
else pd.Series(table.index.dtype, pd.Index([table.index.name]))
)
got = "\n\t".join(got.to_string().split("\n"))
raise ValueError(
f"Cannot "
f"{operation} "
f"table if input index and table index are not alike.\n"
f"Expected index:\n"
f"\t{want}"
f"\nbut yours is:\n"
f"\t{got}"
)
def _maybe_convert_dtype_to_string(
index: pd.Index,
) -> pd.Index:
r"""Possibly set dtype of file level to 'string'."""
if (is_filewise_index(index) and index.dtype == "object") or (
is_segmented_index(index) and index.dtypes[define.IndexField.FILE] == "object"
):
index = utils.set_index_dtypes(
index,
{define.IndexField.FILE: "string"},
)
return index
def _maybe_update_scheme(
table: Base,
):
r"""Replace labels if table is used in a scheme."""
if table.db is not None and isinstance(table, MiscTable):
for scheme in table.db.schemes.values():
if table._id == scheme.labels:
scheme.replace_labels(table._id)