import collections
import re
import typing
import numpy as np
from audmath.core.utils import polyval
VALUE_UNIT_PATTERN = re.compile(
'^ *' # space
'(' # 1st group: value
'[\\-\\+]?[0-9]*[.]?[0-9]*'
')'
' *' # space
'(' # 2nd group: unit
'[a-zA-Zμ]*'
')'
' *$' # space
)
WINDOW_SHAPES = [
'tukey',
'kaiser',
'linear',
'exponential',
'logarithmic',
]
[docs]def db(
x: typing.Union[int, float, typing.Sequence, np.ndarray],
*,
bottom: typing.Union[int, float] = -120,
) -> typing.Union[np.floating, np.ndarray]:
r"""Convert value to decibels.
The decibel of a value :math:`x \in \R`
is given by
.. math::
\text{db}(x) = \begin{cases}
20 \log_{10} x,
& \text{if } x > 10^\frac{\text{bottom}}{20} \\
\text{bottom},
& \text{else}
\end{cases}
where :math:`\text{bottom}` is provided
by the argument of same name.
Args:
x: input value(s)
bottom: minimum decibel value
returned for very low input values.
If set to ``None``
it will return ``-np.Inf``
for values equal or less than 0
Returns:
input value(s) in dB
Examples:
>>> db(1)
0.0
>>> db(0)
-120.0
>>> db(2)
6.020599913279624
>>> db([0, 1])
array([-120., 0.])
"""
if bottom is None:
min_value = 0
bottom = -np.Inf
else:
bottom = np.float64(bottom)
min_value = 10 ** (bottom / 20)
if not isinstance(x, (collections.abc.Sequence, np.ndarray)):
if x <= min_value:
return bottom
else:
return 20 * np.log10(x)
x = np.array(x)
if x.size == 0:
return x
if not np.issubdtype(x.dtype, np.floating):
x = x.astype(np.float64)
mask = (x <= min_value)
x[mask] = bottom
x[~mask] = 20 * np.log10(x[~mask])
return x
[docs]def duration_in_seconds(
duration: typing.Optional[
typing.Union[float, int, str, np.timedelta64]
],
sampling_rate: typing.Union[float, int] = None,
) -> np.floating:
r"""Duration in seconds.
Converts the given duration value to seconds.
A unit can be provided
when ``duration`` is given as a string.
As units the following values are possible.
.. table::
==================================================== ===========
Unit Meaning
==================================================== ===========
W week
D, days, day day
h, hours, hour, hr hour
m, minutes, minute, min, T minute
s, seconds, second, sec, S second
ms, milliseconds, millisecond, millis, milli, L millisecond
us, μs, microseconds, microsecond, micros, micro, U microsecond
ns, nanoseconds, nanoseconds, nanos, nano, N nanosecond
==================================================== ===========
.. _numpy's datetime units: https://numpy.org/doc/stable/reference/arrays.datetime.html#datetime-units
Args:
duration: if ``duration`` is
a float,
integer
or string without unit
it is treated as seconds
or if ``sampling_rate`` is provided
as samples.
If ``duration`` is provided as a string with unit,
e.g. ``'2ms'`` or ``'2 ms'``,
or as a :class:`numpy.timedelta64`
or :class:`pandas.Timedelta` object
it will be converted to seconds
and ``sampling_rate`` is always ignored.
If duration is
``None``,
:obj:`numpy.nan`,
:obj:`pandas.NA`,
:obj:`pandas.NaT`,
``''``,
``'None'``,
``'NaN'``,
``'NaT'``,
or any other lower/mixed case version of those strings
:obj:`numpy.nan` is returned.
If duration is
:obj:`numpy.inf`,
``'Inf'``
or any other lower/mixed case version of that string
:obj:`numpy.inf` is returned,
and ``-``:obj:`numpy.inf` for the negative case
sampling_rate: sampling rate in Hz.
Is ignored
if duration is provided with a unit
Returns:
duration in seconds
Raises:
ValueError: if the provided unit is not supported
ValueError: if ``duration`` is a string
that does not match a valid '<value><unit>' pattern
Examples:
>>> duration_in_seconds(2)
2.0
>>> duration_in_seconds(2.0)
2.0
>>> duration_in_seconds('2')
2.0
>>> duration_in_seconds('2ms')
0.002
>>> duration_in_seconds('2 ms')
0.002
>>> duration_in_seconds('ms')
0.001
>>> duration_in_seconds(2000, sampling_rate=1000)
2.0
>>> duration_in_seconds(np.timedelta64(2, 's'))
2.0
>>> duration_in_seconds(pd.to_timedelta(2, 's'))
2.0
>>> duration_in_seconds('Inf')
inf
>>> duration_in_seconds(None)
nan
""" # noqa: E501
# Dictionary with allowed unit entries
# and mapping from pandas.to_timedelta()
# to numpy.timedelta64, see
# https://numpy.org/doc/stable/reference/arrays.datetime.html#datetime-units
unit_mapping = {
# week
'W': 'W',
# day
'D': 'D',
'days': 'D',
'day': 'D',
# hour
'h': 'h',
'hours': 'h',
'hour': 'h',
'hr': 'h',
# minute
'm': 'm',
'minutes': 'm',
'minute': 'm',
'min': 'm',
'T': 'm',
# second
's': 's',
'seconds': 's',
'second': 's',
'sec': 's',
'S': 's',
# millisecond
'ms': 'ms',
'milliseconds': 'ms',
'millisecond': 'ms',
'millis': 'ms',
'milli': 'ms',
'L': 'ms',
# microsecond
'us': 'us',
'μs': 'us',
'microseconds': 'us',
'microsecond': 'us',
'micros': 'us',
'micro': 'us',
'U': 'us',
# nanosecond
'ns': 'ns',
'nanoseconds': 'ns',
'nanosecond': 'ns',
'nanos': 'ns',
'nano': 'ns',
'N': 'ns',
}
# numpy.timedelta64() accepts only integer as input values,
# so we need to convert all values
# to nanoseconds and integers first
def to_nanos(value, unit):
if unit == 'W':
value = value * 7 * 24 * 60 * 60 * 10 ** 9
elif unit == 'D':
value = value * 24 * 60 * 60 * 10 ** 9
elif unit == 'h':
value = value * 60 * 60 * 10 ** 9
elif unit == 'm':
value = value * 60 * 10 ** 9
elif unit == 's':
value = value * 10 ** 9
elif unit == 'ms':
value = value * 10 ** 6
elif unit == 'us':
value = value * 10 ** 3
return int(value)
if isinstance(duration, str):
# none/-inf/inf duration
if duration.lower() in ['', 'none', 'nan', 'nat']:
return np.NaN
elif duration.lower() == '-inf':
return -np.inf
elif duration.lower() == 'inf' or duration.lower() == '+inf':
return np.inf
# ensure we have a str and not numpy.str_
duration = str(duration)
match = re.match(VALUE_UNIT_PATTERN, duration)
if match is not None:
value, unit = match.groups()
if (
match is None
or (not value and not unit)
):
raise ValueError(
f"Your given duration '{duration}' "
"is not conform to the <value><unit> pattern."
)
if not value or value == '+':
value = 1.0
elif value == '-':
value = -1.0
else:
value = float(value)
if not unit:
if sampling_rate is None:
duration = value
else:
duration = value / sampling_rate
else:
if unit not in unit_mapping:
raise ValueError(
f"The provided unit '{unit}' is not known."
)
unit = unit_mapping[unit]
# duration in nanoseconds
duration = np.timedelta64(to_nanos(value, unit), 'ns')
# duration in seconds
duration = duration / np.timedelta64(1, 's')
elif isinstance(duration, np.timedelta64):
duration = duration / np.timedelta64(1, 's')
# support for pandas.Timedelta
# without dependency to pandas
elif duration.__class__.__name__ == 'Timedelta':
duration = duration.total_seconds()
# handle nan/none durations
elif (
duration is None
or duration.__class__.__name__ == 'NaTType'
or duration.__class__.__name__ == 'NAType'
or np.isnan(duration)
):
return np.NaN
elif sampling_rate is not None:
duration = duration / sampling_rate
return np.float64(duration)
[docs]def inverse_db(
y: typing.Union[int, float, typing.Sequence, np.ndarray],
*,
bottom: typing.Union[int, float] = -120,
) -> typing.Union[np.floating, np.ndarray]:
r"""Convert decibels to amplitude value.
The inverse of a value :math:`y \in \R`
provided in decibel
is given by
.. math::
\text{inverse\_db}(y) = \begin{cases}
10^\frac{y}{20},
& \text{if } y > \text{bottom} \\
0,
& \text{else}
\end{cases}
where :math:`\text{bottom}` is provided
by the argument of same name.
Args:
y: input signal in decibels
bottom: minimum decibel value
which should be converted.
Lower values will be set to 0.
If set to ``None``
it will return 0
only for input values of ``-np.Inf``
Returns:
input signal
Examples:
>>> inverse_db(0)
1.0
>>> inverse_db(-120)
0.0
>>> inverse_db(-3)
0.7079457843841379
>>> inverse_db([-120, 0])
array([0., 1.])
"""
min_value = 0.
if bottom is None:
bottom = -np.Inf
if not isinstance(y, (collections.abc.Sequence, np.ndarray)):
if y <= bottom:
return min_value
else:
return np.power(10., y / 20.)
y = np.array(y)
if y.size == 0:
return y
if not np.issubdtype(y.dtype, np.floating):
y = y.astype(np.float64)
mask = (y <= bottom)
y[mask] = min_value
y[~mask] = np.power(10., y[~mask] / 20.)
return y
[docs]def inverse_normal_distribution(
y: typing.Union[int, float, typing.Sequence, np.ndarray],
) -> typing.Union[np.floating, np.ndarray]:
r"""Inverse normal distribution.
Returns the argument :math:`x`
for which the area under the Gaussian probability density function
is equal to :math:`y`.
It returns :math:`\text{nan}`
if :math:`y \notin [0, 1]`.
The area under the Gaussian probability density function is given by:
.. math::
\frac{1}{\sqrt{2\pi}} \int_{-\infty}^x \exp(-t^2 / 2)\,\text{d}t
This function is a :mod:`numpy` port
of the `Cephes C code`_.
Douglas Thor `implemented it in pure Python`_ under GPL-3.
The output is identical to the implementation
provided by :func:`scipy.special.ndtri`,
and :func:`scipy.stats.norm.ppf`,
and allows you
to avoid installing
and importing :mod:`scipy`.
:func:`audmath.inverse_normal_distribution`
is slower for large arrays
as the following comparison of execution times
on a standard PC show.
.. table::
========== ======= =======
Samples scipy audmath
========== ======= =======
10.000 0.00s 0.01s
100.000 0.00s 0.09s
1.000.000 0.02s 0.88s
10.000.000 0.25s 9.30s
========== ======= =======
.. _Cephes C code: https://github.com/jeremybarnes/cephes/blob/60f27df395b8322c2da22c83751a2366b82d50d1/cprob/ndtri.c
.. _implemented it in pure Python: https://github.com/dougthor42/PyErf/blob/cf38a2c62556cbd4927c9b3f5523f39b6a492472/pyerf/pyerf.py#L183-L287
Args:
y: input value
Returns:
inverted input
Examples:
>>> inverse_normal_distribution([0.05, 0.4, 0.6, 0.95])
array([-1.64485363, -0.2533471 , 0.2533471 , 1.64485363])
""" # noqa: E501
if isinstance(y, np.ndarray):
y = y.copy()
y = np.atleast_1d(y)
x = np.zeros(y.shape)
switch_sign = np.ones(y.shape)
# Handle edge cases
idx1 = y == 0
x[idx1] = -np.Inf
idx2 = y == 1
x[idx2] = np.Inf
idx3 = y < 0
x[idx3] = np.NaN
idx4 = y > 1
x[idx4] = np.NaN
non_valid = np.array([any(i) for i in zip(idx1, idx2, idx3, idx4)])
# Return if no other values are left
if non_valid.sum() == len(x):
return np.float64(x)
switch_sign[non_valid] = 0
# Pre-calculate to avoid recalculation
exp_neg2 = np.exp(-2)
# Approximation for 0 <= |y - 0.5| <= 3/8
p0 = [
-5.99633501014107895267E1,
9.80010754185999661536E1,
-5.66762857469070293439E1,
1.39312609387279679503E1,
-1.23916583867381258016E0,
]
q0 = [
1.0,
1.95448858338141759834E0,
4.67627912898881538453E0,
8.63602421390890590575E1,
-2.25462687854119370527E2,
2.00260212380060660359E2,
-8.20372256168333339912E1,
1.59056225126211695515E1,
-1.18331621121330003142E0,
]
# Approximation for interval z = sqrt(-2 log y ) between 2 and 8,
# i.e. y between exp(-2) = .135 and exp(-32) = 1.27e-14
p1 = [
4.05544892305962419923E0,
3.15251094599893866154E1,
5.71628192246421288162E1,
4.40805073893200834700E1,
1.46849561928858024014E1,
2.18663306850790267539E0,
-1.40256079171354495875E-1,
-3.50424626827848203418E-2,
-8.57456785154685413611E-4,
]
q1 = [
1.0,
1.57799883256466749731E1,
4.53907635128879210584E1,
4.13172038254672030440E1,
1.50425385692907503408E1,
2.50464946208309415979E0,
-1.42182922854787788574E-1,
-3.80806407691578277194E-2,
-9.33259480895457427372E-4,
]
# Approximation for interval z = sqrt(-2 log y ) between 8 and 64,
# i.e. y between exp(-32) = 1.27e-14 and exp(-2048) = 3.67e-890
p2 = [
3.23774891776946035970E0,
6.91522889068984211695E0,
3.93881025292474443415E0,
1.33303460815807542389E0,
2.01485389549179081538E-1,
1.23716634817820021358E-2,
3.01581553508235416007E-4,
2.65806974686737550832E-6,
6.23974539184983293730E-9,
]
q2 = [
1.0,
6.02427039364742014255E0,
3.67983563856160859403E0,
1.37702099489081330271E0,
2.16236993594496635890E-1,
1.34204006088543189037E-2,
3.28014464682127739104E-4,
2.89247864745380683936E-6,
6.79019408009981274425E-9,
]
idx1 = y > (1 - exp_neg2) # y > 0.864...
idx = np.where(non_valid, False, idx1)
y[idx] = 1.0 - y[idx]
switch_sign[idx] = 0
# Case where we don't need high precision
idx2 = y > exp_neg2 # y > 0.135...
idx = np.where(non_valid, False, idx2)
y[idx] = y[idx] - 0.5
y2 = y[idx] ** 2
x[idx] = y[idx] + y[idx] * (y2 * polyval(y2, p0) / polyval(y2, q0))
x[idx] = x[idx] * np.sqrt(2 * np.pi)
switch_sign[idx] = 0
idx3 = ~idx2
idx = np.where(non_valid, False, idx3)
x[idx] = np.sqrt(-2.0 * np.log(y[idx]))
x0 = x[idx] - np.log(x[idx]) / x[idx]
z = 1.0 / x[idx]
x1 = np.where(
x[idx] < 8.0, # y > exp(-32) = 1.2664165549e-14
z * polyval(z, p1) / polyval(z, q1),
z * polyval(z, p2) / polyval(z, q2),
)
x[idx] = x0 - x1
x = np.where(switch_sign == 1, -1 * x, x)
return np.float64(x)
[docs]def rms(
x: typing.Union[int, float, typing.Sequence, np.ndarray],
*,
axis: typing.Union[int, typing.Tuple[int]] = None,
keepdims: bool = False,
) -> typing.Union[np.floating, np.ndarray]:
r"""Root mean square.
The root mean square
for a signal of length :math:`N`
is given by
.. math::
\sqrt{\frac{1}{N} \sum_{n=1}^N x_n^2}
where :math:`x_n` is the value
of a single sample
of the signal.
For an empty signal
0 is returned.
Args:
x: input signal
axis: axis or axes
along which the root mean squares are computed.
The default is to compute the root mean square
of the flattened signal
keepdims: if this is set to ``True``,
the axes which are reduced
are left in the result
as dimensions with size one
Returns:
root mean square of input signal
Examples:
>>> rms([])
0.0
>>> rms([0, 1])
0.7071067811865476
>>> rms([[0, 1], [0, 1]])
0.7071067811865476
>>> rms([[0, 1], [0, 1]], keepdims=True)
array([[0.70710678]])
>>> rms([[0, 1], [0, 1]], axis=1)
array([0.70710678, 0.70710678])
"""
x = np.array(x)
if x.size == 0:
return np.float64(0.0)
return np.sqrt(np.mean(np.square(x), axis=axis, keepdims=keepdims))
[docs]def samples(
duration: float,
sampling_rate: int,
) -> int:
r"""Duration in samples.
The duration is evenly rounded,
after converted to samples.
Args:
duration: duration in s
sampling_rate: sampling rate in Hz
Returns:
duration in number of samples
Examples:
>>> samples(0.5, 10)
5
>>> samples(0.55, 10)
6
"""
return round(duration * sampling_rate)
[docs]def similarity(
u: typing.Union[typing.Sequence, np.ndarray],
v: typing.Union[typing.Sequence, np.ndarray],
) -> typing.Union[np.floating, np.ndarray]:
r"""Cosine similarity between two arrays.
If the incoming arrays are of size
:math:`(k,)`,
a single similarity value is returned.
If one of the incoming arrays is of size
:math:`(n, k)`,
an array of size
:math:`(n,)`
with similarities is returned.
If the arrays are of size
:math:`(n, k)`
and :math:`(m, k)`
an array of size
:math:`(n, m)`
with similarities is returned.
The input arrays can also be provided as
:class:`pandas.DataFrame`
or :class:`pandas.Series`.
The cosine similarity is given by
:math:`\frac{u \cdot v}{\lVert u\rVert_2 \lVert v\rVert_2}`.
Args:
u: input array
v: input array
Returns:
similarity between arrays
Example:
>>> similarity([1, 0], [1, 0])
1.0
>>> similarity([1, 0], [0, 1])
0.0
>>> similarity([1, 0], [-1, 0])
-1.0
>>> similarity([[1, 0]], [1, 0])
array([1.])
>>> similarity([1, 0], [[1, 0], [0, 1]])
array([1., 0.])
>>> similarity([[1, 0], [0, 1]], [[1, 0]])
array([[1.],
[0.]])
>>> similarity([[1, 0], [0, 1]], [[1, 0], [0, 1], [-1, 0]])
array([[ 1., 0., -1.],
[ 0., 1., 0.]])
"""
def to_numpy(x):
if not isinstance(x, np.ndarray):
try:
# pandas object
x = x.to_numpy()
except AttributeError:
# sequence
x = np.array(x)
return x
u = to_numpy(u)
v = to_numpy(v)
# Infer output shape from input
output_shape = '[[..]]'
if u.ndim == 1 and v.ndim == 1:
output_shape = '..'
elif u.ndim == 1 or v.ndim == 1:
output_shape = '[..]'
u = np.atleast_2d(u)
v = np.atleast_2d(v)
u = u / np.linalg.norm(u, ord=2, keepdims=True, axis=-1)
v = v / np.linalg.norm(v, ord=2, keepdims=True, axis=-1)
sim = np.inner(u, v) # always returns [[..]]
# Convert to desired output shape
if output_shape == '..':
sim = np.float64(sim.squeeze())
elif output_shape == '[..]':
if sim.size == 1:
sim = sim[0]
else:
sim = sim.squeeze()
return sim
[docs]def window(
samples: int,
shape: str = 'tukey',
half: str = None,
) -> np.ndarray:
r"""Return a window.
The window will start from
and/or end at 0.
If at least 3 samples are requested
and the number of samples is odd,
the windows maximum value will always be 1.
The shape of the window
is selected via ``shape``
The following figure shows all available shapes.
For the Kaiser window
we use :math:`\beta = 14`
and set its first sample to 0.
.. plot::
import audmath
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import seaborn as sns
for shape in audmath.core.api.WINDOW_SHAPES:
win = audmath.window(101, shape=shape)
plt.plot(win, label=shape)
plt.ylabel('Magnitude')
plt.xlabel('Window Length')
plt.grid(alpha=0.4)
ax = plt.gca()
ax.xaxis.set_major_formatter(mtick.PercentFormatter())
ax.tick_params(axis=u'both', which=u'both',length=0)
plt.xlim([-1.2, 100.3])
plt.ylim([-0.02, 1])
sns.despine(left=True, bottom=True)
# Put a legend to the top right of the current axis
plt.legend()
ax.legend(loc='upper left', bbox_to_anchor=(1, 1))
# Adjsut image size to contain outside legend
fig = plt.gcf()
fig.set_size_inches(6.4, 3.84)
plt.tight_layout()
Args:
samples: length of window
shape: shape of window
half: if ``None`` return whole window,
if ``'left'`` or ``'right'``
return left or right half-window.
Other than the whole window
the half-windows
will always contain 1
as maximum value
as long as ``samples`` > 1
Returns:
window
Raises:
ValueError: if ``shape`` is not supported
ValueError: if ``half`` is not supported
Examples:
>>> window(7)
array([0. , 0.25, 0.75, 1. , 0.75, 0.25, 0. ])
>>> window(6)
array([0. , 0.25, 0.75, 0.75, 0.25, 0. ])
>>> window(5, shape='linear', half='left')
array([0. , 0.25, 0.5 , 0.75, 1. ])
"""
if shape not in WINDOW_SHAPES:
raise ValueError(
"shape has to be one of the following: "
f"{(', ').join(WINDOW_SHAPES)},"
f"not '{shape}'."
)
if (
half is not None
and half not in ['left', 'right']
):
raise ValueError(
"half has to be 'left' or 'right' "
f"not '{half}'."
)
def left(samples, shape):
if samples < 2:
win = np.arange(samples)
elif shape == 'linear':
win = np.arange(samples) / (samples - 1)
elif shape == 'kaiser':
# Kaiser windows as approximation of DPSS window
# as often used for tapering windows
win = np.kaiser(2 * (samples - 1), beta=14)[:(samples - 1)]
# Ensure first entry is 0
win[0] = 0
# Add 1 at the end
win = np.concatenate([win, np.array([1])])
elif shape == 'tukey':
# Tukey window,
# which is also often used as tapering window
# 1/2 * (1 - cos(2pi n / (4N alpha)))
x = np.arange(samples)
alpha = 0.5
width = 4 * (samples - 1) * alpha
win = 0.5 * (1 - np.cos(2 * np.pi * x / width))
elif shape == 'exponential':
x = np.arange(samples)
win = (np.exp(x) - 1) / (np.exp(samples - 1) - 1)
elif shape == 'logarithmic':
x = np.arange(samples)
win = np.log10(x + 1) / np.log10(samples)
return win.astype(np.float64)
if half is None:
# For odd (1, 3, 5, ...) number of samples
# we include 1 as window maximum.
# For even numbers we exclude 1 as window maximum
if samples % 2 != 0:
left_win = left(int(np.ceil(samples / 2)), shape)
right_win = np.flip(left_win)[1:]
else:
left_win = left(int(samples / 2) + 1, shape)[:-1]
right_win = np.flip(left_win)
win = np.concatenate([left_win, right_win])
elif half == 'left':
win = left(samples, shape)
elif half == 'right':
win = np.flip(left(samples, shape))
return win