import collections
import re
import typing
import numpy as np
from audmath.core.utils import polyval
VALUE_UNIT_PATTERN = re.compile(
"^ *" # space
"(" # 1st group: value
"[\\-\\+]?[0-9]*[.]?[0-9]*"
")"
" *" # space
"(" # 2nd group: unit
"[a-zA-Zμ]*"
")"
" *$" # space
)
WINDOW_SHAPES = [
"tukey",
"kaiser",
"linear",
"exponential",
"logarithmic",
]
[docs]def db(
x: typing.Union[int, float, typing.Sequence, np.ndarray],
*,
bottom: typing.Union[int, float] = -120,
) -> typing.Union[np.floating, np.ndarray]:
r"""Convert value to decibels.
The decibel of a value :math:`x \in \R`
is given by
.. math::
\text{db}(x) = \begin{cases}
20 \log_{10} x,
& \text{if } x > 10^\frac{\text{bottom}}{20} \\
\text{bottom},
& \text{else}
\end{cases}
where :math:`\text{bottom}` is provided
by the argument of same name.
Args:
x: input value(s)
bottom: minimum decibel value
returned for very low input values.
If set to ``None``
it will return ``-np.Inf``
for values equal or less than 0
Returns:
input value(s) in dB
Examples:
>>> db(1)
np.float64(0.0)
>>> db(0)
np.float64(-120.0)
>>> db(2)
np.float64(6.020599913279624)
>>> db([0, 1])
array([-120., 0.])
"""
if bottom is None:
min_value = 0
bottom = -np.inf
else:
bottom = np.float64(bottom)
min_value = 10 ** (bottom / 20)
if not isinstance(x, (collections.abc.Sequence, np.ndarray)):
if x <= min_value:
return bottom
else:
return 20 * np.log10(x)
x = np.array(x)
if x.size == 0:
return x
if not np.issubdtype(x.dtype, np.floating):
x = x.astype(np.float64)
mask = x <= min_value
x[mask] = bottom
x[~mask] = 20 * np.log10(x[~mask])
return x
[docs]def duration_in_seconds(
duration: typing.Optional[typing.Union[float, int, str, np.timedelta64]],
sampling_rate: typing.Union[float, int] = None,
) -> np.floating:
r"""Duration in seconds.
Converts the given duration value to seconds.
A unit can be provided
when ``duration`` is given as a string.
As units the following values are possible.
.. table::
==================================================== ===========
Unit Meaning
==================================================== ===========
W week
D, days, day day
h, hours, hour, hr hour
m, minutes, minute, min, T minute
s, seconds, second, sec, S second
ms, milliseconds, millisecond, millis, milli, L millisecond
us, μs, microseconds, microsecond, micros, micro, U microsecond
ns, nanoseconds, nanoseconds, nanos, nano, N nanosecond
==================================================== ===========
.. _numpy's datetime units: https://numpy.org/doc/stable/reference/arrays.datetime.html#datetime-units
Args:
duration: if ``duration`` is
a float,
integer
or string without unit
it is treated as seconds
or if ``sampling_rate`` is provided
as samples.
If ``duration`` is provided as a string with unit,
e.g. ``'2ms'`` or ``'2 ms'``,
or as a :class:`numpy.timedelta64`
or :class:`pandas.Timedelta` object
it will be converted to seconds
and ``sampling_rate`` is always ignored.
If duration is
``None``,
:obj:`numpy.nan`,
:obj:`pandas.NA`,
:obj:`pandas.NaT`,
``''``,
``'None'``,
``'NaN'``,
``'NaT'``,
or any other lower/mixed case version of those strings
:obj:`numpy.nan` is returned.
If duration is
:obj:`numpy.inf`,
``'Inf'``
or any other lower/mixed case version of that string
:obj:`numpy.inf` is returned,
and ``-``:obj:`numpy.inf` for the negative case
sampling_rate: sampling rate in Hz.
Is ignored
if duration is provided with a unit
Returns:
duration in seconds
Raises:
ValueError: if the provided unit is not supported
ValueError: if ``duration`` is a string
that does not match a valid '<value><unit>' pattern
Examples:
>>> duration_in_seconds(2)
np.float64(2.0)
>>> duration_in_seconds(2.0)
np.float64(2.0)
>>> duration_in_seconds("2")
np.float64(2.0)
>>> duration_in_seconds("2ms")
np.float64(0.002)
>>> duration_in_seconds("2 ms")
np.float64(0.002)
>>> duration_in_seconds("ms")
np.float64(0.001)
>>> duration_in_seconds(2000, sampling_rate=1000)
np.float64(2.0)
>>> duration_in_seconds(np.timedelta64(2, "s"))
np.float64(2.0)
>>> duration_in_seconds(pd.to_timedelta(2, "s"))
np.float64(2.0)
>>> duration_in_seconds("Inf")
inf
>>> duration_in_seconds(None)
nan
""" # noqa: E501
# Dictionary with allowed unit entries
# and mapping from pandas.to_timedelta()
# to numpy.timedelta64, see
# https://numpy.org/doc/stable/reference/arrays.datetime.html#datetime-units
unit_mapping = {
# week
"W": "W",
# day
"D": "D",
"days": "D",
"day": "D",
# hour
"h": "h",
"hours": "h",
"hour": "h",
"hr": "h",
# minute
"m": "m",
"minutes": "m",
"minute": "m",
"min": "m",
"T": "m",
# second
"s": "s",
"seconds": "s",
"second": "s",
"sec": "s",
"S": "s",
# millisecond
"ms": "ms",
"milliseconds": "ms",
"millisecond": "ms",
"millis": "ms",
"milli": "ms",
"L": "ms",
# microsecond
"us": "us",
"μs": "us",
"microseconds": "us",
"microsecond": "us",
"micros": "us",
"micro": "us",
"U": "us",
# nanosecond
"ns": "ns",
"nanoseconds": "ns",
"nanosecond": "ns",
"nanos": "ns",
"nano": "ns",
"N": "ns",
}
# numpy.timedelta64() accepts only integer as input values,
# so we need to convert all values
# to nanoseconds and integers first
def to_nanos(value, unit):
if unit == "W":
value = value * 7 * 24 * 60 * 60 * 10**9
elif unit == "D":
value = value * 24 * 60 * 60 * 10**9
elif unit == "h":
value = value * 60 * 60 * 10**9
elif unit == "m":
value = value * 60 * 10**9
elif unit == "s":
value = value * 10**9
elif unit == "ms":
value = value * 10**6
elif unit == "us":
value = value * 10**3
return int(value)
if isinstance(duration, str):
# none/-inf/inf duration
if duration.lower() in ["", "none", "nan", "nat"]:
return np.nan
elif duration.lower() == "-inf":
return -np.inf
elif duration.lower() == "inf" or duration.lower() == "+inf":
return np.inf
# ensure we have a str and not numpy.str_
duration = str(duration)
match = re.match(VALUE_UNIT_PATTERN, duration)
if match is not None:
value, unit = match.groups()
if match is None or (not value and not unit):
raise ValueError(
f"Your given duration '{duration}' "
"is not conform to the <value><unit> pattern."
)
if not value or value == "+":
value = 1.0
elif value == "-":
value = -1.0
else:
value = float(value)
if not unit:
if sampling_rate is None:
duration = value
else:
duration = value / sampling_rate
else:
if unit not in unit_mapping:
raise ValueError(f"The provided unit '{unit}' is not known.")
unit = unit_mapping[unit]
# duration in nanoseconds
duration = np.timedelta64(to_nanos(value, unit), "ns")
# duration in seconds
duration = duration / np.timedelta64(1, "s")
elif isinstance(duration, np.timedelta64):
duration = duration / np.timedelta64(1, "s")
# support for pandas.Timedelta
# without dependency to pandas
elif duration.__class__.__name__ == "Timedelta":
duration = duration.total_seconds()
# handle nan/none durations
elif (
duration is None
or duration.__class__.__name__ == "NaTType"
or duration.__class__.__name__ == "NAType"
or np.isnan(duration)
):
return np.nan
elif sampling_rate is not None:
duration = duration / sampling_rate
return np.float64(duration)
[docs]def inverse_db(
y: typing.Union[int, float, typing.Sequence, np.ndarray],
*,
bottom: typing.Union[int, float] = -120,
) -> typing.Union[np.floating, np.ndarray]:
r"""Convert decibels to amplitude value.
The inverse of a value :math:`y \in \R`
provided in decibel
is given by
.. math::
\text{inverse\_db}(y) = \begin{cases}
10^\frac{y}{20},
& \text{if } y > \text{bottom} \\
0,
& \text{else}
\end{cases}
where :math:`\text{bottom}` is provided
by the argument of same name.
Args:
y: input signal in decibels
bottom: minimum decibel value
which should be converted.
Lower values will be set to 0.
If set to ``None``
it will return 0
only for input values of ``-np.Inf``
Returns:
input signal
Examples:
>>> inverse_db(0)
np.float64(1.0)
>>> inverse_db(-120)
np.float64(0.0)
>>> inverse_db(-3)
np.float64(0.7079457843841379)
>>> inverse_db([-120, 0])
array([0., 1.])
"""
min_value = np.float64(0.0)
if bottom is None:
bottom = -np.inf
if not isinstance(y, (collections.abc.Sequence, np.ndarray)):
if y <= bottom:
return min_value
else:
return np.power(10.0, y / 20.0)
y = np.array(y)
if y.size == 0:
return y
if not np.issubdtype(y.dtype, np.floating):
y = y.astype(np.float64)
mask = y <= bottom
y[mask] = min_value
y[~mask] = np.power(10.0, y[~mask] / 20.0)
return y
[docs]def inverse_normal_distribution(
y: typing.Union[int, float, typing.Sequence, np.ndarray],
) -> typing.Union[np.floating, np.ndarray]:
r"""Inverse normal distribution.
Returns the argument :math:`x`
for which the area under the Gaussian probability density function
is equal to :math:`y`.
It returns :math:`\text{nan}`
if :math:`y \notin [0, 1]`.
The area under the Gaussian probability density function is given by:
.. math::
\frac{1}{\sqrt{2\pi}} \int_{-\infty}^x \exp(-t^2 / 2)\,\text{d}t
This function is a :mod:`numpy` port
of the `Cephes C code`_.
Douglas Thor `implemented it in pure Python`_ under GPL-3.
The output is identical to the implementation
provided by :func:`scipy.special.ndtri`,
and :func:`scipy.stats.norm.ppf`,
and allows you
to avoid installing
and importing :mod:`scipy`.
:func:`audmath.inverse_normal_distribution`
is slower for large arrays
as the following comparison of execution times
on a standard PC show.
.. table::
========== ======= =======
Samples scipy audmath
========== ======= =======
10.000 0.00s 0.01s
100.000 0.00s 0.09s
1.000.000 0.02s 0.88s
10.000.000 0.25s 9.30s
========== ======= =======
.. _Cephes C code: https://github.com/jeremybarnes/cephes/blob/60f27df395b8322c2da22c83751a2366b82d50d1/cprob/ndtri.c
.. _implemented it in pure Python: https://github.com/dougthor42/PyErf/blob/cf38a2c62556cbd4927c9b3f5523f39b6a492472/pyerf/pyerf.py#L183-L287
Args:
y: input value
Returns:
inverted input
Examples:
>>> inverse_normal_distribution([0.05, 0.4, 0.6, 0.95])
array([-1.64485363, -0.2533471 , 0.2533471 , 1.64485363])
""" # noqa: E501
if isinstance(y, np.ndarray):
y = y.copy()
y = np.atleast_1d(y)
x = np.zeros(y.shape)
switch_sign = np.ones(y.shape)
# Handle edge cases
idx1 = y == 0
x[idx1] = -np.inf
idx2 = y == 1
x[idx2] = np.inf
idx3 = y < 0
x[idx3] = np.nan
idx4 = y > 1
x[idx4] = np.nan
non_valid = np.array([any(i) for i in zip(idx1, idx2, idx3, idx4)])
# Return if no other values are left
if non_valid.sum() == len(x):
return x.astype(np.float64)
switch_sign[non_valid] = 0
# Pre-calculate to avoid recalculation
exp_neg2 = np.exp(-2)
# Approximation for 0 <= |y - 0.5| <= 3/8
p0 = [
-5.99633501014107895267e1,
9.80010754185999661536e1,
-5.66762857469070293439e1,
1.39312609387279679503e1,
-1.23916583867381258016e0,
]
q0 = [
1.0,
1.95448858338141759834e0,
4.67627912898881538453e0,
8.63602421390890590575e1,
-2.25462687854119370527e2,
2.00260212380060660359e2,
-8.20372256168333339912e1,
1.59056225126211695515e1,
-1.18331621121330003142e0,
]
# Approximation for interval z = sqrt(-2 log y ) between 2 and 8,
# i.e. y between exp(-2) = .135 and exp(-32) = 1.27e-14
p1 = [
4.05544892305962419923e0,
3.15251094599893866154e1,
5.71628192246421288162e1,
4.40805073893200834700e1,
1.46849561928858024014e1,
2.18663306850790267539e0,
-1.40256079171354495875e-1,
-3.50424626827848203418e-2,
-8.57456785154685413611e-4,
]
q1 = [
1.0,
1.57799883256466749731e1,
4.53907635128879210584e1,
4.13172038254672030440e1,
1.50425385692907503408e1,
2.50464946208309415979e0,
-1.42182922854787788574e-1,
-3.80806407691578277194e-2,
-9.33259480895457427372e-4,
]
# Approximation for interval z = sqrt(-2 log y ) between 8 and 64,
# i.e. y between exp(-32) = 1.27e-14 and exp(-2048) = 3.67e-890
p2 = [
3.23774891776946035970e0,
6.91522889068984211695e0,
3.93881025292474443415e0,
1.33303460815807542389e0,
2.01485389549179081538e-1,
1.23716634817820021358e-2,
3.01581553508235416007e-4,
2.65806974686737550832e-6,
6.23974539184983293730e-9,
]
q2 = [
1.0,
6.02427039364742014255e0,
3.67983563856160859403e0,
1.37702099489081330271e0,
2.16236993594496635890e-1,
1.34204006088543189037e-2,
3.28014464682127739104e-4,
2.89247864745380683936e-6,
6.79019408009981274425e-9,
]
idx1 = y > (1 - exp_neg2) # y > 0.864...
idx = np.where(non_valid, False, idx1)
y[idx] = 1.0 - y[idx]
switch_sign[idx] = 0
# Case where we don't need high precision
idx2 = y > exp_neg2 # y > 0.135...
idx = np.where(non_valid, False, idx2)
y[idx] = y[idx] - 0.5
y2 = y[idx] ** 2
x[idx] = y[idx] + y[idx] * (y2 * polyval(y2, p0) / polyval(y2, q0))
x[idx] = x[idx] * np.sqrt(2 * np.pi)
switch_sign[idx] = 0
idx3 = ~idx2
idx = np.where(non_valid, False, idx3)
x[idx] = np.sqrt(-2.0 * np.log(y[idx]))
x0 = x[idx] - np.log(x[idx]) / x[idx]
z = 1.0 / x[idx]
x1 = np.where(
x[idx] < 8.0, # y > exp(-32) = 1.2664165549e-14
z * polyval(z, p1) / polyval(z, q1),
z * polyval(z, p2) / polyval(z, q2),
)
x[idx] = x0 - x1
x = np.where(switch_sign == 1, -1 * x, x)
return x.astype(np.float64)
[docs]def rms(
x: typing.Union[int, float, typing.Sequence, np.ndarray],
*,
axis: typing.Union[int, typing.Tuple[int]] = None,
keepdims: bool = False,
) -> typing.Union[np.floating, np.ndarray]:
r"""Root mean square.
The root mean square
for a signal of length :math:`N`
is given by
.. math::
\sqrt{\frac{1}{N} \sum_{n=1}^N x_n^2}
where :math:`x_n` is the value
of a single sample
of the signal.
For an empty signal
0 is returned.
Args:
x: input signal
axis: axis or axes
along which the root mean squares are computed.
The default is to compute the root mean square
of the flattened signal
keepdims: if this is set to ``True``,
the axes which are reduced
are left in the result
as dimensions with size one
Returns:
root mean square of input signal
Examples:
>>> rms([])
np.float64(0.0)
>>> rms([0, 1])
np.float64(0.7071067811865476)
>>> rms([[0, 1], [0, 1]])
np.float64(0.7071067811865476)
>>> rms([[0, 1], [0, 1]], keepdims=True)
array([[0.70710678]])
>>> rms([[0, 1], [0, 1]], axis=1)
array([0.70710678, 0.70710678])
"""
x = np.array(x)
if x.size == 0:
return np.float64(0.0)
return np.sqrt(np.mean(np.square(x), axis=axis, keepdims=keepdims))
[docs]def samples(
duration: float,
sampling_rate: int,
) -> int:
r"""Duration in samples.
The duration is evenly rounded,
after converted to samples.
Args:
duration: duration in s
sampling_rate: sampling rate in Hz
Returns:
duration in number of samples
Examples:
>>> samples(0.5, 10)
5
>>> samples(0.55, 10)
6
"""
return round(duration * sampling_rate)
[docs]def similarity(
u: typing.Union[typing.Sequence, np.ndarray],
v: typing.Union[typing.Sequence, np.ndarray],
) -> typing.Union[np.floating, np.ndarray]:
r"""Cosine similarity between two arrays.
If the incoming arrays are of size
:math:`(k,)`,
a single similarity value is returned.
If one of the incoming arrays is of size
:math:`(n, k)`,
an array of size
:math:`(n,)`
with similarities is returned.
If the arrays are of size
:math:`(n, k)`
and :math:`(m, k)`
an array of size
:math:`(n, m)`
with similarities is returned.
The input arrays can also be provided as
:class:`pandas.DataFrame`
or :class:`pandas.Series`.
The cosine similarity is given by
:math:`\frac{u \cdot v}{\lVert u\rVert_2 \lVert v\rVert_2}`.
Args:
u: input array
v: input array
Returns:
similarity between arrays
Example:
>>> similarity([1, 0], [1, 0])
np.float64(1.0)
>>> similarity([1, 0], [0, 1])
np.float64(0.0)
>>> similarity([1, 0], [-1, 0])
np.float64(-1.0)
>>> similarity([[1, 0]], [1, 0])
array([1.])
>>> similarity([1, 0], [[1, 0], [0, 1]])
array([1., 0.])
>>> similarity([[1, 0], [0, 1]], [[1, 0]])
array([[1.],
[0.]])
>>> similarity([[1, 0], [0, 1]], [[1, 0], [0, 1], [-1, 0]])
array([[ 1., 0., -1.],
[ 0., 1., 0.]])
"""
def to_numpy(x):
if not isinstance(x, np.ndarray):
try:
# pandas object
x = x.to_numpy()
except AttributeError:
# sequence
x = np.array(x)
return x
u = to_numpy(u)
v = to_numpy(v)
# Infer output shape from input
output_shape = "[[..]]"
if u.ndim == 1 and v.ndim == 1:
output_shape = ".."
elif u.ndim == 1 or v.ndim == 1:
output_shape = "[..]"
u = np.atleast_2d(u)
v = np.atleast_2d(v)
u = u / np.linalg.norm(u, ord=2, keepdims=True, axis=-1)
v = v / np.linalg.norm(v, ord=2, keepdims=True, axis=-1)
sim = np.inner(u, v) # always returns [[..]]
# Convert to desired output shape
if output_shape == "..":
sim = np.float64(sim.squeeze())
elif output_shape == "[..]":
if sim.size == 1:
sim = sim[0]
else:
sim = sim.squeeze()
return sim
[docs]def window(
samples: int,
shape: str = "tukey",
half: str = None,
) -> np.ndarray:
r"""Return a window.
The window will start from
and/or end at 0.
If at least 3 samples are requested
and the number of samples is odd,
the windows maximum value will always be 1.
The shape of the window
is selected via ``shape``
The following figure shows all available shapes.
For the Kaiser window
we use :math:`\beta = 14`
and set its first sample to 0.
.. plot::
import audmath
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import seaborn as sns
for shape in audmath.core.api.WINDOW_SHAPES:
win = audmath.window(101, shape=shape)
plt.plot(win, label=shape)
plt.ylabel('Magnitude')
plt.xlabel('Window Length')
plt.grid(alpha=0.4)
ax = plt.gca()
ax.xaxis.set_major_formatter(mtick.PercentFormatter())
ax.tick_params(axis=u'both', which=u'both',length=0)
plt.xlim([-1.2, 100.3])
plt.ylim([-0.02, 1])
sns.despine(left=True, bottom=True)
# Put a legend to the top right of the current axis
plt.legend()
ax.legend(loc='upper left', bbox_to_anchor=(1, 1))
# Adjsut image size to contain outside legend
fig = plt.gcf()
fig.set_size_inches(6.4, 3.84)
plt.tight_layout()
Args:
samples: length of window
shape: shape of window
half: if ``None`` return whole window,
if ``'left'`` or ``'right'``
return left or right half-window.
Other than the whole window
the half-windows
will always contain 1
as maximum value
as long as ``samples`` > 1
Returns:
window
Raises:
ValueError: if ``shape`` is not supported
ValueError: if ``half`` is not supported
Examples:
>>> window(7)
array([0. , 0.25, 0.75, 1. , 0.75, 0.25, 0. ])
>>> window(6)
array([0. , 0.25, 0.75, 0.75, 0.25, 0. ])
>>> window(5, shape="linear", half="left")
array([0. , 0.25, 0.5 , 0.75, 1. ])
"""
if shape not in WINDOW_SHAPES:
raise ValueError(
"shape has to be one of the following: "
f"{(', ').join(WINDOW_SHAPES)},"
f"not '{shape}'."
)
if half is not None and half not in ["left", "right"]:
raise ValueError("half has to be 'left' or 'right' " f"not '{half}'.")
def left(samples, shape):
if samples < 2:
win = np.arange(samples)
elif shape == "linear":
win = np.arange(samples) / (samples - 1)
elif shape == "kaiser":
# Kaiser windows as approximation of DPSS window
# as often used for tapering windows
win = np.kaiser(2 * (samples - 1), beta=14)[: (samples - 1)]
# Ensure first entry is 0
win[0] = 0
# Add 1 at the end
win = np.concatenate([win, np.array([1])])
elif shape == "tukey":
# Tukey window,
# which is also often used as tapering window
# 1/2 * (1 - cos(2pi n / (4N alpha)))
x = np.arange(samples)
alpha = 0.5
width = 4 * (samples - 1) * alpha
win = 0.5 * (1 - np.cos(2 * np.pi * x / width))
elif shape == "exponential":
x = np.arange(samples)
win = (np.exp(x) - 1) / (np.exp(samples - 1) - 1)
elif shape == "logarithmic":
x = np.arange(samples)
win = np.log10(x + 1) / np.log10(samples)
return win.astype(np.float64)
if half is None:
# For odd (1, 3, 5, ...) number of samples
# we include 1 as window maximum.
# For even numbers we exclude 1 as window maximum
if samples % 2 != 0:
left_win = left(int(np.ceil(samples / 2)), shape)
right_win = np.flip(left_win)[1:]
else:
left_win = left(int(samples / 2) + 1, shape)[:-1]
right_win = np.flip(left_win)
win = np.concatenate([left_win, right_win])
elif half == "left":
win = left(samples, shape)
elif half == "right":
win = np.flip(left(samples, shape))
return win