Source code for audformat.core.testing

import os
import random
from typing import Callable
from typing import Dict
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union

import numpy as np
import pandas as pd

import audeer
import audiofile as af

from audformat.core import define
from audformat.core.attachment import Attachment
from audformat.core.column import Column
from audformat.core.database import Database
from audformat.core.index import filewise_index
from audformat.core.index import is_filewise_index
from audformat.core.index import is_segmented_index
from audformat.core.index import segmented_index
from audformat.core.media import Media
from audformat.core.rater import Rater
from audformat.core.scheme import Scheme
from audformat.core.split import Split
from audformat.core.table import MiscTable
from audformat.core.table import Table


[docs]def add_misc_table( db: Database, table_id: str, index: pd.Index, *, columns: Union[ str, Sequence[str], Dict[str, Union[str, Tuple[Optional[str], Optional[str]]]], ] = None, p_none: float = None, split_id: str = None, media_id: str = None, ) -> MiscTable: r"""Add a miscellaneous table with random values. By default, adds one column for every scheme in the database. To create a specific set of columns use ``columns``. Args: db: a database table_id: ID of table that will be created index: index object columns: a list of scheme_ids or a dictionary with column names as keys and tuples of ``(scheme_id, rater_id)`` as values. ``None`` values are allowed p_none: probability to draw ``None`` split_id: optional split ID media_id: optional media ID Returns: table object """ db[table_id] = MiscTable( index, split_id=split_id, media_id=media_id, ) _add_columns(db, db[table_id], columns, len(index), p_none) return db[table_id]
[docs]def add_table( db: Database, table_id: str, index_type: str, *, columns: Union[ str, Sequence[str], Dict[str, Union[str, Tuple[Optional[str], Optional[str]]]], ] = None, num_files: Union[int, Sequence[int]] = 5, num_segments_per_file: int = 5, file_duration: Union[str, pd.Timedelta] = "5s", file_root: str = "audio", p_none: float = None, split_id: str = None, media_id: str = None, ) -> Table: r"""Add a table with random values. By default, adds one column for every scheme in the database. To create a specific set of columns use ``columns``. If a ``media_id`` is passed, the file format will be determined from there. Otherwise, WAV is used. Args: db: a database table_id: ID of table that will be created index_type: the index type, see :class:`audformat.define.IndexType` for available index types columns: a list of scheme_ids or a dictionary with column names as keys and tuples of ``(scheme_id, rater_id)`` as values. ``None`` values are allowed num_files: by default files are named ``'001'``, ``'002'``, etc. up the number of files. For a different ordering a sequence of integers can be passed num_segments_per_file: number of segments per file (only applies to segmented table) file_duration: the file duration file_root: file sub directory p_none: probability to draw ``None`` split_id: optional split ID media_id: optional media ID Returns: table object """ if isinstance(file_duration, str): file_duration = pd.Timedelta(file_duration) audio_format = "wav" if media_id and db.media[media_id].format: audio_format = db.media[media_id].format if isinstance(num_files, int): files = [f"{file_root}/{idx + 1:03}.{audio_format}" for idx in range(num_files)] else: files = [f"{file_root}/{idx:03}.{audio_format}" for idx in num_files] num_files = len(num_files) if index_type == define.IndexType.FILEWISE: n_items = num_files db[table_id] = Table( filewise_index(files), split_id=split_id, media_id=media_id, ) elif index_type == define.IndexType.SEGMENTED: n_items = num_files * num_segments_per_file starts = [] ends = [] new_files = [] for file in files: times = [ pd.to_timedelta(random.random() * file_duration, unit="s") for _ in range(num_segments_per_file * 2) ] times.sort() starts.extend(times[::2]) ends.extend(times[1::2]) new_files.extend([file] * num_segments_per_file) db[table_id] = Table( segmented_index(new_files, starts=starts, ends=ends), split_id=split_id, media_id=media_id, ) _add_columns(db, db[table_id], columns, n_items, p_none) return db[table_id]
[docs]def create_attachment_files( db: Database, root: str, ): r"""Create attachment folders and files of a database. If the basename of an attachment path contains a dot (``.``) it is considered to represent a file, otherwise a directory. Args: db: a database root: root folder of database """ for attachment_id in list(db.attachments): path = audeer.path(root, db.attachments[attachment_id].path) if not os.path.exists(path): if "." in os.path.basename(path): audeer.mkdir(os.path.dirname(path)) audeer.touch(path) else: audeer.mkdir(path)
[docs]def create_audio_files( db: Database, *, sample_generator: Callable[[float], float] = None, sampling_rate: int = 16000, channels: int = 1, file_duration: Union[str, pd.Timedelta] = "60s", ): r"""Create audio files for a database. By default, empty files are created. A sample generator function can be passed to generate the samples. The function gets as input a time stamp and should create a sample in the amplitude range ``[-1..1]``. Args: db: a database sample_generator: sample generator sampling_rate: sampling rate in Hz channels: number of channels file_duration: file duration Raises: RuntimeError: if databases was not saved yet RuntimeError: if database is not portable """ if db.root is None: # pragma: no cover raise RuntimeError("Cannot create files if databases was not saved.") if not db.is_portable: # pragma: no cover raise RuntimeError("Cannot create files if databases is not portable.") file_duration = pd.to_timedelta(file_duration) for file in db.files: path = os.path.join(db.root, file) if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) n = int(file_duration.total_seconds() * sampling_rate) x = np.zeros((channels, n)) if sample_generator: # pragma: no cover ts = np.arange(n) / sampling_rate for c in range(channels): for idx, t in enumerate(ts): x[c, idx] = sample_generator(t) af.write(path, x, sampling_rate)
[docs]def create_db( minimal: bool = False, data: Dict[str, Union[pd.Series, pd.DataFrame]] = None, ) -> Database: r"""Create test database. Creates a test database called ``unittest`` with a filewise, segmented, and miscellaneous table. Args: minimal: create minimal database without tables data: create tables from pandas objects Returns: database object """ ######## # Head # ######## db = Database( name="unittest", source="internal", usage=define.Usage.UNRESTRICTED, languages=["de", "English"], ) if minimal: return db db.description = "A database for unit testing." db.author = "J. Wagner, H. Wierstorf" db.organization = "audEERING GmbH" db.license = define.License.CC0_1_0 db.meta["audformat"] = "https://github.com/audeering/audformat" if data is not None: def to_scheme_type(series: pd.Series) -> str: if series.dtype.name.startswith("int"): return define.DataType.INTEGER if series.dtype.name.startswith("float"): return define.DataType.FLOAT return define.DataType.STRING for table_id, obj in data.items(): if isinstance(obj, pd.Series): obj = obj.to_frame() if is_filewise_index(obj) or is_segmented_index(obj): db[table_id] = Table(obj.index) else: db[table_id] = MiscTable(obj.index) for column_id, column in obj.items(): dtype = to_scheme_type(column) if dtype not in db.schemes: db.schemes[dtype] = Scheme(dtype) db[table_id][column_id] = Column(scheme_id=dtype) db[table_id].set(obj) return db ############### # Attachments # ############### db.attachments["file"] = Attachment("extra/file.txt") db.attachments["folder"] = Attachment("extra/folder") ######### # Media # ######### db.media["microphone"] = Media( format="wav", sampling_rate=16000, channels=1, bit_depth=16, ) db.media["webcam"] = Media( format="avi", video_fps=25, video_resolution=[800, 600], video_depth=8, video_channels=3, ) ########## # Raters # ########## db.raters["gold"] = Rater( description="Gold standard by taking the average ratings." ) db.raters["machine"] = Rater( type=define.RaterType.MACHINE, description="Predictions made by the machine.", meta={"features": "ComParE_2016", "classifier": "LibSVM"}, ) ########### # Schemes # ########### db.schemes["bool"] = Scheme(dtype=define.DataType.BOOL) db.schemes["date"] = Scheme(dtype=define.DataType.DATE) db.schemes["float"] = Scheme( dtype=define.DataType.FLOAT, minimum=-1.0, maximum=1.0, ) db.schemes["int"] = Scheme( dtype=define.DataType.INTEGER, minimum=0, maximum=100, ) db.schemes["label"] = Scheme(labels=["label1", "label2", "label3"]) db.schemes["label_map_int"] = Scheme(labels={1: "a", 2: "b", 3: "c"}) db.schemes["label_map_str"] = Scheme( labels={ "label1": {"prop1": 1, "prop2": "a"}, "label2": {"prop1": 2, "prop2": "b"}, "label3": {"prop1": 3, "prop2": "c"}, } ) db.schemes["string"] = Scheme() db.schemes["time"] = Scheme(dtype=define.DataType.TIME) ############## # Misc Table # ############## index = pd.Index( ["label1", "label2", "label3"], name="labels", dtype="string", ) db["misc"] = MiscTable(index) db["misc"]["int"] = Column(scheme_id="int") db["misc"]["int"].set(db.schemes["int"].draw(len(index))) db["misc"]["label"] = Column(scheme_id="label") db["misc"]["label"].set(db.schemes["label"].draw(len(index))) ############################ # Schemes from Misc Tables # ############################ db.schemes["label_map_misc"] = Scheme(labels="misc", dtype="str") ########## # Splits # ########## db.splits["dev"] = Split(type=define.SplitType.DEVELOP) db.splits["test"] = Split(type=define.SplitType.TEST) db.splits["train"] = Split(type=define.SplitType.TRAIN) ########## # Tables # ########## add_table( db, "files", define.IndexType.FILEWISE, columns={scheme: (scheme, "gold") for scheme in db.schemes}, num_files=100, p_none=0.25, split_id="train", media_id="microphone", ) db["files"]["no_scheme"] = Column() db["files"]["no_scheme"].set(db.schemes["string"].draw(100, p_none=0.25)) add_table( db, "segments", define.IndexType.SEGMENTED, columns={scheme: (scheme, "gold") for scheme in db.schemes}, num_files=10, num_segments_per_file=10, file_duration="60s", p_none=0.25, split_id="dev", media_id="microphone", ) db["segments"]["no_scheme"] = Column() db["segments"]["no_scheme"].set(db.schemes["string"].draw(100, p_none=0.25)) return db
def _add_columns( db: Database, table: Table, columns: Optional[ Union[ str, Sequence[str], Dict[str, Union[str, Tuple[Optional[str], Optional[str]]]], ] ], n_items: int, p_none: float, ): r"""Convert 'columns' argument of add_[misc_]table() to dict.""" if columns is None: columns = columns or {s: (s, None) for s in list(db.schemes)} elif isinstance(columns, str): columns = {columns: (columns, None)} elif isinstance(columns, Sequence): columns = {s: (s, None) for s in columns} for column_id, (scheme_id, rater_id) in columns.items(): table[column_id] = Column( scheme_id=scheme_id, rater_id=rater_id, ) if scheme_id is not None: table[column_id].set(db.schemes[scheme_id].draw(n_items, p_none=p_none))