# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import scipy.signal
[docs]def signal_psd(
signal, sampling_rate=1000, method="welch", show=True, min_frequency=0, max_frequency=np.inf, window=None, ar_order=15, order_criteria="KIC", order_corrected=True, burg_norm=True
):
"""Compute the Power Spectral Density (PSD).
Parameters
----------
signal : Union[list, np.array, pd.Series]
The signal (i.e., a time series) in the form of a vector of values.
sampling_rate : int
The sampling frequency of the signal (in Hz, i.e., samples/second).
method : str
Either 'multitapers' (default; requires the 'mne' package), or 'welch' (requires the 'scipy' package).
show : bool
If True, will return a plot. If False, will return the density values that can be plotted externally.
min_frequency : float
The minimum frequency.
max_frequency : float
The maximum frequency.
window : int
Length of each window in seconds (for Welch method). If None (default), window will be automatically
calculated to capture at least 2 cycles of min_frequency. If the length of recording does not
allow the formal, window will be default to half of the length of recording.
ar_order : int
The order of autoregression (for AR methods e.g. Burg).
order_criteria : str
The criteria to automatically select order in parametric PSD (for AR methods e.g. Burg).
order_corrected : bool
Specify for AIC and KIC order_criteria. If unsure which method to use to choose the order,
rely on the default of corrected KIC.
bug_norm : bool
Normalization for Burg method.
See Also
--------
signal_filter, mne.time_frequency.psd_array_multitaper, scipy.signal.welch
Returns
-------
pd.DataFrame
A DataFrame containing the Power Spectrum values and a plot if
`show` is True.
Examples
--------
>>> import neurokit2 as nk
>>>
>>> signal = nk.signal_simulate(frequency=5) + 0.5*nk.signal_simulate(frequency=20)
>>>
>>> fig1 = nk.signal_psd(signal, method="multitapers")
>>> fig1 #doctest: +SKIP
>>> fig2 = nk.signal_psd(signal, method="welch", min_frequency=1)
>>> fig2 #doctest: +SKIP
>>> fig3 = nk.signal_psd(signal, method="burg", min_frequency=1)
>>>
>>> data = nk.signal_psd(signal, method="multitapers", max_frequency=30, show=False)
>>> fig4 = data.plot(x="Frequency", y="Power")
>>> fig4 #doctest: +SKIP
>>> data = nk.signal_psd(signal, method="welch", max_frequency=30, show=False, min_frequency=1)
>>> fig5 = data.plot(x="Frequency", y="Power")
>>> fig5 #doctest: +SKIP
"""
# Constant Detrend
signal = signal - np.mean(signal)
# MNE
if method.lower() in ["multitapers", "mne"]:
frequency, power = _signal_psd_multitaper(signal, sampling_rate=sampling_rate, min_frequency=min_frequency, max_frequency=max_frequency)
else:
# Define window length
if min_frequency == 0:
min_frequency = 0.001 # sanitize lowest frequency
if window is not None:
nperseg = int(window * sampling_rate)
else:
# to capture at least 2 cycles of min_frequency
nperseg = int((2 / min_frequency) * sampling_rate)
# in case duration of recording is not sufficient
if nperseg > len(signal) / 2:
print(
"Neurokit warning: signal_psd(): The duration of recording is too short to support a "
"sufficiently long window for high frequency resolution. Consider using a longer recording "
"or increasing the `min_frequency`"
)
nperseg = int(len(signal) / 2)
# Welch (Scipy)
if method.lower() in ["welch"]:
frequency, power = _signal_psd_welch(
signal,
sampling_rate=sampling_rate,
nperseg=nperseg
)
# Lombscargle (Scipy)
elif method.lower() in ["lombscargle", "lomb"]:
frequency, power = _signal_psd_lomb(
signal,
sampling_rate=sampling_rate,
nperseg=nperseg,
min_frequency=min_frequency,
max_frequency=max_frequency
)
# BURG
elif method.lower() in ["burg", "pburg", "spectrum"]:
frequency, power = _signal_psd_burg(
signal,
sampling_rate=sampling_rate,
order=ar_order,
criteria=order_criteria,
corrected=order_corrected,
side="one-sided",
norm=burg_norm,
nperseg=nperseg
)
# Store results
data = pd.DataFrame({"Frequency": frequency, "Power": power})
# Filter
data = data.loc[np.logical_and(data["Frequency"] >= min_frequency, data["Frequency"] <= max_frequency)]
if show is True:
ax = data.plot(x="Frequency", y="Power", logy=True, title="Power Spectral Density (ms^2/Hz)")
ax.set(xlabel="Frequency (Hz)", ylabel="Spectrum")
return ax
else:
return data
# =============================================================================
# Multitaper method
# =============================================================================
def _signal_psd_multitaper(
signal, sampling_rate=1000, min_frequency=0, max_frequency=np.inf
):
try:
import mne
power, frequency = mne.time_frequency.psd_array_multitaper(
signal,
sfreq=sampling_rate,
fmin=min_frequency,
fmax=max_frequency,
adaptive=True,
normalization="full",
verbose=False,
)
except ImportError:
raise ImportError(
"NeuroKit warning: signal_psd(): the 'mne'",
"module is required for the 'mne' method to run.",
"Please install it first (`pip install mne`).",
)
return frequency, power
# =============================================================================
# Welch method
# =============================================================================
def _signal_psd_welch(
signal, sampling_rate=1000, nperseg=None
):
frequency, power = scipy.signal.welch(
signal,
fs=sampling_rate,
scaling="density",
detrend=False,
nfft=int(nperseg * 2),
average="mean",
nperseg=nperseg,
)
return frequency, power
# =============================================================================
# Lomb method
# =============================================================================
def _signal_psd_lomb(
signal, sampling_rate=1000, nperseg=None, min_frequency=0, max_frequency=np.inf
):
nfft = int(nperseg * 2)
if max_frequency == np.inf:
max_frequency = 20 # sanitize highest frequency
# Specify frequency range
frequency = np.linspace(min_frequency, max_frequency, nfft)
# Compute angular frequencies
# angular_freqs = np.asarray(2 * np.pi / frequency)
# Specify sample times
t = np.arange(len(signal))
power = np.asarray(scipy.signal.lombscargle(t, signal, frequency, normalize=True))
return frequency, power
# =============================================================================
# Burg method
# =============================================================================
def _signal_psd_burg(signal, sampling_rate=1000, order=15, criteria="KIC", corrected=True, side="one-sided", norm=True, nperseg=None):
nfft = int(nperseg * 2)
ar, rho, ref = _signal_arma_burg(signal, order=order, criteria=criteria, corrected=corrected, side=side, norm=norm)
psd = _signal_psd_from_arma(ar=ar, rho=rho, sampling_rate=sampling_rate, nfft=nfft, side=side, norm=norm)
# signal is real, not complex
if nfft % 2 == 0:
power = psd[0:int(nfft / 2 + 1)] * 2
else:
power = psd[0:int((nfft + 1) / 2)] * 2
# angular frequencies, w
# for one-sided psd, w spans [0, pi]
# for two-sdied psd, w spans [0, 2pi)
# for dc-centered psd, w spans (-pi, pi] for even nfft, (-pi, pi) for add nfft
if side == "one-sided":
w = np.pi * np.linspace(0, 1, len(power))
# elif side == "two-sided":
# w = np.pi * np.linspace(0, 2, len(power), endpoint=False) #exclude last point
# elif side == "centerdc":
# if nfft % 2 == 0:
# w = np.pi * np.linspace(-1, 1, len(power))
# else:
# w = np.pi * np.linspace(-1, 1, len(power) + 1, endpoint=False) # exclude last point
# w = w[1:] # exclude first point (extra)
frequency = (w * sampling_rate) / (2 * np.pi)
return frequency, power
def _signal_arma_burg(signal, order=15, criteria="KIC", corrected=True, side="one-sided", norm=True):
# Sanitize order and signal
if order <= 0.:
raise ValueError("Order must be > 0")
if order > len(signal):
raise ValueError("Order must be less than length signal minus 2")
if not isinstance(signal, np.ndarray):
signal = np.array(signal)
N = len(signal)
# Initialisation
# rho is variance of driving white noise process (prediction error)
rho = sum(abs(signal)**2.) / float(N)
denominator = rho * 2. * N
ar = np.zeros(0, dtype=complex) # AR parametric signal model estimate
ref = np.zeros(0, dtype=complex) # vector K of reflection coefficients (parcor coefficients)
ef = signal.astype(complex) # forward prediction error
eb = signal.astype(complex) # backward prediction error
temp = 1.
# Main recursion
for k in range(0, order):
# calculate the next order reflection coefficient
numerator = sum([ef[j]*eb[j - 1].conjugate() for j in range(k + 1, N)])
denominator = temp * denominator - abs(ef[k])**2 - abs(eb[N - 1])**2
kp = -2. * numerator / denominator
# Update the prediction error
temp = 1. - abs(kp)**2.
new_rho = temp * rho
if criteria is not None:
# k=k+1 because order goes from 1 to P whereas k starts at 0.
residual_new = _criteria(criteria=criteria, N=N, k=k+1, rho=new_rho, corrected=corrected)
if k == 0:
residual_old = 2. * abs(residual_new)
# Stop as criteria has reached
if residual_new > residual_old:
break
# This should be after the criteria
residual_old = residual_new
rho = new_rho
if rho <= 0:
raise ValueError("Found a negative value (expected positive strictly) %s."
"Decrease the order" % rho)
ar = np.resize(ar, ar.size + 1)
ar[k] = kp
if k == 0:
for j in range(N-1, k, -1):
ef_previous = ef[j] # previous value
ef[j] = ef_previous + kp * eb[j-1] # Eq. (8.7)
eb[j] = eb[j-1] + kp.conjugate() * ef_previous
else:
# Update the AR coeff
khalf = (k + 1) // 2 # khalf must be an integer
for j in range(0, khalf):
ar_previous = ar[j] # previous value
ar[j] = ar_previous + kp * ar[k-j-1].conjugate() # Eq. (8.2)
if j != k-j-1:
ar[k-j-1] = ar[k-j-1] + kp * ar_previous.conjugate() # Eq. (8.2)
# Update the forward and backward prediction errors
for j in range(N-1, k, -1):
ef_previous = ef[j] # previous value
ef[j] = ef_previous + kp * eb[j-1] # Eq. (8.7)
eb[j] = eb[j-1] + kp.conjugate() * ef_previous
# save the reflection coefficient
ref = np.resize(ref, ref.size + 1)
ref[k] = kp
return ar, rho, ref
# =============================================================================
# Utilities
# =============================================================================
def _criteria(criteria=None, N=None, k=None, rho=None, corrected=True):
"""Criteria to automatically select order in parametric PSD.
AIC, AICc, KIC and AKICc are based on information theory. They attempt to balance the complexity
(or length) of the model against how well the model fits the data.
AIC and KIC are biased estimates of the asymmetric and the symmetric Kullback-Leibler divergence
respectively. AICc and AKICc attempt to correct the bias.
Parameters
----------
criteria : str
The criteria to be used. The critera can be one of the following: AIC (Akaike Information Criterion),
KIC (Kullback Iinformation Criterion), FPE (Final Prediction Error Criterion), MDL (Minimum
Description Length), CAT (Criterion Autoregressive Transfer Function), AIC order-selection using
eigen values, MDL order-selection using eigen values.
N : int
The sample size of the signal.
k : int
The AR order.
rho : int
The rho at order k.
corrected : bool
Specify for AIC and KIC methods.
"""
if criteria == "AIC":
if corrected is True:
residual = np.log(rho) + 2. * (k + 1) / (N - k - 2)
else:
residual = N * np.log(np.array(rho)) + 2. * (np.array(k) + 1)
elif criteria == "KIC":
if corrected is True:
residual = np.log(rho) + k/N/(N-k) + (3. - (k + 2.) / N) * (k + 1.) / (N - k - 2.)
else:
residual = np.log(rho) + 3. * (k + 1.) / float(N)
elif criteria == "FPE":
fpe = rho * (N + k + 1.) / (N - k - 1)
return fpe
elif criteria == "MDL":
mdl = N * np.log(rho) + k * np.log(N)
return mdl
return residual
def _signal_psd_from_arma(ar=None, ma=None, rho=1., sampling_rate=1000, nfft=None, side="one-sided", norm=False):
if ar is None and ma is None:
raise ValueError("Either AR or MA model must be provided")
psd = np.zeros(nfft, dtype=complex)
if ar is not None:
ip = len(ar)
den = np.zeros(nfft, dtype=complex)
den[0] = 1.+0j
for k in range(0, ip):
den[k+1] = ar[k]
denf = np.fft.fft(den, nfft)
if ma is not None:
iq = len(ma)
num = np.zeros(nfft, dtype=complex)
num[0] = 1.+0j
for k in range(0, iq):
num[k+1] = ma[k]
numf = np.fft(num, nfft)
if ar is not None and ma is not None:
psd = rho / sampling_rate * abs(numf)**2. / abs(denf)**2.
elif ar is not None:
psd = rho / sampling_rate / abs(denf)**2.
elif ma is not None:
psd = rho / sampling_rate * abs(numf)**2.
psd = np.real(psd) # The PSD is a twosided PSD.
# convert to one-sided
if side == "one-sided":
assert len(psd) % 2 == 0
one_side_psd = np.array(psd[0:len(psd)//2 + 1]) * 2.
one_side_psd[0] /= 2.
# one_side_psd[-1] = psd[-1]
psd = one_side_psd
# convert to centerdc
elif side == "centerdc":
first_half = psd[0:len(psd)//2]
second_half = psd[len(psd)//2:]
rotate_second_half = (second_half[-1:] + second_half[:-1])
center_psd = np.concatenate((rotate_second_half, first_half))
center_psd[0] = psd[-1]
psd = center_psd
if norm is True:
psd /= max(psd)
return psd