# - * - coding: utf-8 - * -
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.signal
import scipy.stats
from ..signal import signal_findpeaks, signal_plot, signal_smooth, signal_zerocrossings
[docs]def ecg_findpeaks(ecg_cleaned, sampling_rate=1000, method="neurokit", show=False):
"""Find R-peaks in an ECG signal.
Low-level function used by `ecg_peaks()` to identify R-peaks in an ECG signal using a different
set of algorithms. See `ecg_peaks()` for details.
Parameters
----------
ecg_cleaned : Union[list, np.array, pd.Series]
The cleaned ECG channel as returned by `ecg_clean()`.
sampling_rate : int
The sampling frequency of `ecg_signal` (in Hz, i.e., samples/second).
Defaults to 1000.
method : string
The algorithm to be used for R-peak detection. Can be one of 'neurokit' (default),
'pamtompkins1985', 'hamilton2002', 'christov2004', 'gamboa2008', 'elgendi2010', 'engzeemod2012',
'kalidas2017', 'martinez2003', 'rodrigues2020' or 'promac'.
show : bool
If True, will return a plot to visualizing the thresholds used in the algorithm.
Useful for debugging.
Returns
-------
info : dict
A dictionary containing additional information, in this case the
samples at which R-peaks occur, accessible with the key "ECG_R_Peaks".
See Also
--------
ecg_clean, signal_fixpeaks, ecg_peaks, ecg_rate, ecg_process, ecg_plot
Examples
--------
.. plot::
:context: close-figs
>>> import neurokit2 as nk
>>>
>>> ecg = nk.ecg_simulate(duration=10, sampling_rate=1000)
>>> cleaned = nk.ecg_clean(ecg, sampling_rate=1000)
>>> info = nk.ecg_findpeaks(cleaned)
>>> nk.events_plot(info["ECG_R_Peaks"], cleaned) #doctest: +ELLIPSIS
<Figure ...>
>>>
>>> # Different methods
>>> neurokit = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="neurokit"), method="neurokit")
>>> pantompkins1985 = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="pantompkins1985"), method="pantompkins1985")
>>> hamilton2002 = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="hamilton2002"), method="hamilton2002")
>>> martinez2003 = nk.ecg_findpeaks(cleaned, method="martinez2003")
>>> christov2004 = nk.ecg_findpeaks(cleaned, method="christov2004")
>>> gamboa2008 = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="gamboa2008"), method="gamboa2008")
>>> elgendi2010 = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="elgendi2010"), method="elgendi2010")
>>> engzeemod2012 = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="engzeemod2012"), method="engzeemod2012")
>>> kalidas2017 = nk.ecg_findpeaks(nk.ecg_clean(ecg, method="kalidas2017"), method="kalidas2017")
>>> rodrigues2020 = nk.ecg_findpeaks(cleaned, method="rodrigues2020")
>>>
>>> # Visualize
>>> nk.events_plot([neurokit["ECG_R_Peaks"],
... pantompkins1985["ECG_R_Peaks"],
... hamilton2002["ECG_R_Peaks"],
... christov2004["ECG_R_Peaks"],
... gamboa2008["ECG_R_Peaks"],
... elgendi2010["ECG_R_Peaks"],
... engzeemod2012["ECG_R_Peaks"],
... kalidas2017["ECG_R_Peaks"],
... martinez2003["ECG_R_Peaks"],
... rodrigues2020["ECG_R_Peaks"]], cleaned) #doctest: +ELLIPSIS
<Figure ...>
>>>
>>> # Method-agreement
>>> ecg = nk.ecg_simulate(duration=10, sampling_rate=500)
>>> ecg = nk.signal_distort(ecg,
... sampling_rate=500,
... noise_amplitude=0.2, noise_frequency=[25, 50],
... artifacts_amplitude=0.2, artifacts_frequency=50)
>>> nk.ecg_findpeaks(ecg, sampling_rate=1000, method="promac", show=True) #doctest: +ELLIPSIS
{'ECG_R_Peaks': array(...)}
References
--------------
- Gamboa, H. (2008). Multi-modal behavioral biometrics based on hci and electrophysiology.
PhD ThesisUniversidade.
- Zong, W., Heldt, T., Moody, G. B., & Mark, R. G. (2003, September). An open-source algorithm to
detect onset of arterial blood pressure pulses. In Computers in Cardiology, 2003 (pp. 259-262). IEEE.
- Hamilton, Open Source ECG Analysis Software Documentation, E.P.Limited, 2002.
- Pan, J., & Tompkins, W. J. (1985). A real-time QRS detection algorithm. IEEE transactions on
biomedical engineering, (3), 230-236.
- Engelse, W. A. H., & Zeelenberg, C. (1979). A single scan algorithm for QRS detection and feature
extraction IEEE Comput Cardiol. Long Beach: IEEE Computer Society.
- Lourenço, A., Silva, H., Leite, P., Lourenço, R., & Fred, A. L. (2012, February). Real Time
Electrocardiogram Segmentation for Finger based ECG Biometrics. In Biosignals (pp. 49-54).
"""
# Try retrieving right column
if isinstance(ecg_cleaned, pd.DataFrame):
try:
ecg_cleaned = ecg_cleaned["ECG_Clean"]
except NameError:
try:
ecg_cleaned = ecg_cleaned["ECG_Raw"]
except NameError:
ecg_cleaned = ecg_cleaned["ECG"]
method = method.lower() # remove capitalised letters
# Run peak detection algorithm
if method in ["nk", "nk2", "neurokit", "neurokit2"]:
rpeaks = _ecg_findpeaks_neurokit(ecg_cleaned, sampling_rate, show=show)
elif method in ["pantompkins", "pantompkins1985"]:
rpeaks = _ecg_findpeaks_pantompkins(ecg_cleaned, sampling_rate)
elif method in ["gamboa2008", "gamboa"]:
rpeaks = _ecg_findpeaks_gamboa(ecg_cleaned, sampling_rate)
elif method in ["ssf", "slopesumfunction", "zong", "zong2003"]:
rpeaks = _ecg_findpeaks_ssf(ecg_cleaned, sampling_rate)
elif method in ["hamilton", "hamilton2002"]:
rpeaks = _ecg_findpeaks_hamilton(ecg_cleaned, sampling_rate)
elif method in ["christov", "christov2004"]:
rpeaks = _ecg_findpeaks_christov(ecg_cleaned, sampling_rate)
elif method in ["engzee", "engzee2012", "engzeemod", "engzeemod2012"]:
rpeaks = _ecg_findpeaks_engzee(ecg_cleaned, sampling_rate)
elif method in ["elgendi", "elgendi2010"]:
rpeaks = _ecg_findpeaks_elgendi(ecg_cleaned, sampling_rate)
elif method in ["kalidas2017", "swt", "kalidas", "kalidastamil", "kalidastamil2017"]:
rpeaks = _ecg_findpeaks_kalidas(ecg_cleaned, sampling_rate)
elif method in ["martinez2003", "martinez"]:
rpeaks = _ecg_findpeaks_WT(ecg_cleaned, sampling_rate)
elif method in ["rodrigues2020", "rodrigues", "asi"]:
rpeaks = _ecg_findpeaks_rodrigues(ecg_cleaned, sampling_rate)
elif method in ["promac", "all"]:
rpeaks = _ecg_findpeaks_promac(ecg_cleaned, sampling_rate=sampling_rate, threshold=0.33, show=show)
else:
raise ValueError("NeuroKit error: ecg_findpeaks(): 'method' should be one of 'neurokit'" "or 'pamtompkins'.")
# Prepare output.
info = {"ECG_R_Peaks": rpeaks}
return info
# =============================================================================
# Probabilistic Methods-Agreement via Convolution (ProMAC)
# =============================================================================
def _ecg_findpeaks_promac(signal, sampling_rate=1000, threshold=0.33, show=False, **kwargs):
x = np.zeros(len(signal))
x = _ecg_findpeaks_promac_addmethod(signal, sampling_rate, x, _ecg_findpeaks_neurokit, **kwargs)
x = _ecg_findpeaks_promac_addmethod(signal, sampling_rate, x, _ecg_findpeaks_gamboa, **kwargs)
x = _ecg_findpeaks_promac_addmethod(signal, sampling_rate, x, _ecg_findpeaks_ssf, **kwargs)
x = _ecg_findpeaks_promac_addmethod(signal, sampling_rate, x, _ecg_findpeaks_engzee, **kwargs)
x = _ecg_findpeaks_promac_addmethod(signal, sampling_rate, x, _ecg_findpeaks_elgendi, **kwargs)
x = _ecg_findpeaks_promac_addmethod(signal, sampling_rate, x, _ecg_findpeaks_kalidas, **kwargs)
x = _ecg_findpeaks_promac_addmethod(signal, sampling_rate, x, _ecg_findpeaks_WT, **kwargs)
x = _ecg_findpeaks_promac_addmethod(signal, sampling_rate, x, _ecg_findpeaks_rodrigues, **kwargs)
# Rescale
x = x / np.max(x)
convoluted = x.copy()
# Remove below threshold
x[x < threshold] = 0
# Find peaks
peaks = signal_findpeaks(x, height_min=threshold)["Peaks"]
if show is True:
signal_plot([signal, convoluted], standardize=True)
[plt.axvline(x=peak, color="red", linestyle="--") for peak in peaks] # pylint: disable=W0106
return peaks
def _ecg_findpeaks_promac_addmethod(signal, sampling_rate, x, fun, **kwargs):
peaks = fun(signal, sampling_rate=sampling_rate, **kwargs)
x += _ecg_findpeaks_promac_convolve(signal, peaks, sampling_rate=sampling_rate)
return x
def _ecg_findpeaks_promac_convolve(signal, peaks, sampling_rate=1000):
x = np.zeros(len(signal))
x[peaks] = 1
# Because a typical QRS is roughly defined within about 100ms
sd = sampling_rate / 10
shape = scipy.stats.norm.pdf(np.linspace(-sd * 4, sd * 4, num=int(sd * 8)), loc=0, scale=sd)
return np.convolve(x, shape, "same") # Return convolved
# =============================================================================
# NeuroKit
# =============================================================================
def _ecg_findpeaks_neurokit(
signal,
sampling_rate=1000,
smoothwindow=0.1,
avgwindow=0.75,
gradthreshweight=1.5,
minlenweight=0.4,
mindelay=0.3,
show=False,
):
"""All tune-able parameters are specified as keyword arguments.
The `signal` must be the highpass-filtered raw ECG with a lowcut of .5 Hz.
"""
if show is True:
__, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True)
# Compute the ECG's gradient as well as the gradient threshold. Run with
# show=True in order to get an idea of the threshold.
grad = np.gradient(signal)
absgrad = np.abs(grad)
smooth_kernel = int(np.rint(smoothwindow * sampling_rate))
avg_kernel = int(np.rint(avgwindow * sampling_rate))
smoothgrad = signal_smooth(absgrad, kernel="boxcar", size=smooth_kernel)
avggrad = signal_smooth(smoothgrad, kernel="boxcar", size=avg_kernel)
gradthreshold = gradthreshweight * avggrad
mindelay = int(np.rint(sampling_rate * mindelay))
if show is True:
ax1.plot(signal)
ax2.plot(smoothgrad)
ax2.plot(gradthreshold)
# Identify start and end of QRS complexes.
qrs = smoothgrad > gradthreshold
beg_qrs = np.where(np.logical_and(np.logical_not(qrs[0:-1]), qrs[1:]))[0]
end_qrs = np.where(np.logical_and(qrs[0:-1], np.logical_not(qrs[1:])))[0]
# Throw out QRS-ends that precede first QRS-start.
end_qrs = end_qrs[end_qrs > beg_qrs[0]]
# Identify R-peaks within QRS (ignore QRS that are too short).
num_qrs = min(beg_qrs.size, end_qrs.size)
min_len = np.mean(end_qrs[:num_qrs] - beg_qrs[:num_qrs]) * minlenweight
peaks = [0]
for i in range(num_qrs):
beg = beg_qrs[i]
end = end_qrs[i]
len_qrs = end - beg
if len_qrs < min_len:
continue
if show is True:
ax2.axvspan(beg, end, facecolor="m", alpha=0.5)
# Find local maxima and their prominence within QRS.
data = signal[beg:end]
locmax, props = scipy.signal.find_peaks(data, prominence=(None, None))
if locmax.size > 0:
# Identify most prominent local maximum.
peak = beg + locmax[np.argmax(props["prominences"])]
# Enforce minimum delay between peaks.
if peak - peaks[-1] > mindelay:
peaks.append(peak)
peaks.pop(0)
if show is True:
ax1.scatter(peaks, signal[peaks], c="r")
peaks = np.asarray(peaks).astype(int) # Convert to int
return peaks
# =============================================================================
# Pan & Tompkins (1985)
# =============================================================================
def _ecg_findpeaks_pantompkins(signal, sampling_rate=1000):
"""From https://github.com/berndporr/py-ecg-detectors/
- Jiapu Pan and Willis J. Tompkins. A Real-Time QRS Detection Algorithm.
In: IEEE Transactions on Biomedical Engineering BME-32.3 (1985), pp. 230–236.
"""
diff = np.diff(signal)
squared = diff * diff
N = int(0.12 * sampling_rate)
mwa = _ecg_findpeaks_MWA(squared, N)
mwa[: int(0.2 * sampling_rate)] = 0
mwa_peaks = _ecg_findpeaks_peakdetect(mwa, sampling_rate)
mwa_peaks = np.array(mwa_peaks, dtype="int")
return mwa_peaks
# =============================================================================
# Hamilton (2002)
# =============================================================================
def _ecg_findpeaks_hamilton(signal, sampling_rate=1000):
"""From https://github.com/berndporr/py-ecg-detectors/
- Hamilton, Open Source ECG Analysis Software Documentation, E.P.Limited, 2002.
"""
diff = abs(np.diff(signal))
b = np.ones(int(0.08 * sampling_rate))
b = b / int(0.08 * sampling_rate)
a = [1]
ma = scipy.signal.lfilter(b, a, diff)
ma[0 : len(b) * 2] = 0
n_pks = []
n_pks_ave = 0.0
s_pks = []
s_pks_ave = 0.0
QRS = [0]
RR = []
RR_ave = 0.0
th = 0.0
i = 0
idx = []
peaks = []
for i in range(len(ma)): # pylint: disable=C0200,R1702
if i > 0 and i < len(ma) - 1 and ma[i - 1] < ma[i] and ma[i + 1] < ma[i]: # pylint: disable=R1716
peak = i
peaks.append(peak)
if ma[peak] > th and (peak - QRS[-1]) > 0.3 * sampling_rate:
QRS.append(peak)
idx.append(peak)
s_pks.append(ma[peak])
if len(n_pks) > 8:
s_pks.pop(0)
s_pks_ave = np.mean(s_pks)
if RR_ave != 0.0 and QRS[-1] - QRS[-2] > 1.5 * RR_ave:
missed_peaks = peaks[idx[-2] + 1 : idx[-1]]
for missed_peak in missed_peaks:
if missed_peak - peaks[idx[-2]] > int(0.360 * sampling_rate) and ma[missed_peak] > 0.5 * th:
QRS.append(missed_peak)
QRS.sort()
break
if len(QRS) > 2:
RR.append(QRS[-1] - QRS[-2])
if len(RR) > 8:
RR.pop(0)
RR_ave = int(np.mean(RR))
else:
n_pks.append(ma[peak])
if len(n_pks) > 8:
n_pks.pop(0)
n_pks_ave = np.mean(n_pks)
th = n_pks_ave + 0.45 * (s_pks_ave - n_pks_ave)
i += 1
QRS.pop(0)
QRS = np.array(QRS, dtype="int")
return QRS
# =============================================================================
# Slope Sum Function (SSF) - Zong et al. (2003)
# =============================================================================
def _ecg_findpeaks_ssf(signal, sampling_rate=1000, threshold=20, before=0.03, after=0.01):
"""From https://github.com/PIA-
Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L448.
- W. Zong, T. Heldt, G.B. Moody, and R.G. Mark. An open-source algorithm to detect onset of arterial
blood pressure pulses. In Computers in Cardiology, 2003, pages 259–262, 2003.
"""
# TODO: Doesn't really seems to work
# convert to samples
winB = int(before * sampling_rate)
winA = int(after * sampling_rate)
Rset = set()
length = len(signal)
# diff
dx = np.diff(signal)
dx[dx >= 0] = 0
dx = dx ** 2
# detection
(idx,) = np.nonzero(dx > threshold)
idx0 = np.hstack(([0], idx))
didx = np.diff(idx0)
# search
sidx = idx[didx > 1]
for item in sidx:
a = item - winB
if a < 0:
a = 0
b = item + winA
if b > length:
continue
r = np.argmax(signal[a:b]) + a
Rset.add(r)
# output
rpeaks = list(Rset)
rpeaks.sort()
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# Christov (2004)
# =============================================================================
def _ecg_findpeaks_christov(signal, sampling_rate=1000):
"""From https://github.com/berndporr/py-ecg-detectors/
- Ivaylo I. Christov, Real time electrocardiogram QRS detection using combined adaptive threshold,
BioMedical Engineering OnLine 2004, vol. 3:28, 2004.
"""
total_taps = 0
b = np.ones(int(0.02 * sampling_rate))
b = b / int(0.02 * sampling_rate)
total_taps += len(b)
a = [1]
MA1 = scipy.signal.lfilter(b, a, signal)
b = np.ones(int(0.028 * sampling_rate))
b = b / int(0.028 * sampling_rate)
total_taps += len(b)
a = [1]
MA2 = scipy.signal.lfilter(b, a, MA1)
Y = []
for i in range(1, len(MA2) - 1):
diff = abs(MA2[i + 1] - MA2[i - 1])
Y.append(diff)
b = np.ones(int(0.040 * sampling_rate))
b = b / int(0.040 * sampling_rate)
total_taps += len(b)
a = [1]
MA3 = scipy.signal.lfilter(b, a, Y)
MA3[0:total_taps] = 0
ms50 = int(0.05 * sampling_rate)
ms200 = int(0.2 * sampling_rate)
ms1200 = int(1.2 * sampling_rate)
ms350 = int(0.35 * sampling_rate)
M = 0
newM5 = 0
M_list = []
MM = []
M_slope = np.linspace(1.0, 0.6, ms1200 - ms200)
F = 0
F_list = []
R = 0
RR = []
Rm = 0
R_list = []
MFR = 0
MFR_list = []
QRS = []
for i in range(len(MA3)): # pylint: disable=C0200
# M
if i < 5 * sampling_rate:
M = 0.6 * np.max(MA3[: i + 1])
MM.append(M)
if len(MM) > 5:
MM.pop(0)
elif QRS and i < QRS[-1] + ms200:
newM5 = 0.6 * np.max(MA3[QRS[-1] : i])
if newM5 > 1.5 * MM[-1]:
newM5 = 1.1 * MM[-1]
elif QRS and i == QRS[-1] + ms200:
if newM5 == 0:
newM5 = MM[-1]
MM.append(newM5)
if len(MM) > 5:
MM.pop(0)
M = np.mean(MM)
elif QRS and i > QRS[-1] + ms200 and i < QRS[-1] + ms1200:
M = np.mean(MM) * M_slope[i - (QRS[-1] + ms200)]
elif QRS and i > QRS[-1] + ms1200:
M = 0.6 * np.mean(MM)
# F
if i > ms350:
F_section = MA3[i - ms350 : i]
max_latest = np.max(F_section[-ms50:])
max_earliest = np.max(F_section[:ms50])
F += (max_latest - max_earliest) / 150.0
# R
if QRS and i < QRS[-1] + int((2.0 / 3.0 * Rm)):
R = 0
elif QRS and i > QRS[-1] + int((2.0 / 3.0 * Rm)) and i < QRS[-1] + Rm:
dec = (M - np.mean(MM)) / 1.4
R = 0 + dec
MFR = M + F + R
M_list.append(M)
F_list.append(F)
R_list.append(R)
MFR_list.append(MFR)
if not QRS and MA3[i] > MFR:
QRS.append(i)
elif QRS and i > QRS[-1] + ms200 and MA3[i] > MFR:
QRS.append(i)
if len(QRS) > 2:
RR.append(QRS[-1] - QRS[-2])
if len(RR) > 5:
RR.pop(0)
Rm = int(np.mean(RR))
QRS.pop(0)
QRS = np.array(QRS, dtype="int")
return QRS
# =============================================================================
# Gamboa (2008)
# =============================================================================
def _ecg_findpeaks_gamboa(signal, sampling_rate=1000, tol=0.002):
"""From https://github.com/PIA-
Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L834.
- Gamboa, H. (2008). Multi-modal behavioral biometrics based on hci and electrophysiology.
PhD ThesisUniversidade.
"""
hist, edges = np.histogram(signal, 100, density=True)
TH = 0.01
F = np.cumsum(hist)
v0 = edges[np.nonzero(F > TH)[0][0]]
v1 = edges[np.nonzero(F < (1 - TH))[0][-1]]
nrm = max([abs(v0), abs(v1)])
norm_signal = signal / float(nrm)
d2 = np.diff(norm_signal, 2)
b = np.nonzero((np.diff(np.sign(np.diff(-d2)))) == -2)[0] + 2 # pylint: disable=E1130
b = np.intersect1d(b, np.nonzero(-d2 > tol)[0]) # pylint: disable=E1130
rpeaks = []
if len(b) >= 3:
b = b.astype("float")
previous = b[0]
# convert to samples
v_100ms = int(0.1 * sampling_rate)
v_300ms = int(0.3 * sampling_rate)
for i in b[1:]:
if i - previous > v_300ms:
previous = i
rpeaks.append(np.argmax(signal[int(i) : int(i + v_100ms)]) + i)
rpeaks = sorted(list(set(rpeaks)))
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# Engzee Modified (2012)
# =============================================================================
def _ecg_findpeaks_engzee(signal, sampling_rate=1000):
"""From https://github.com/berndporr/py-ecg-detectors/
- C. Zeelenberg, A single scan algorithm for QRS detection and feature extraction, IEEE Comp.
in Cardiology, vol. 6, pp. 37-42, 1979
- A. Lourenco, H. Silva, P. Leite, R. Lourenco and A. Fred, "Real Time Electrocardiogram Segmentation
for Finger Based ECG Biometrics", BIOSIGNALS 2012, pp. 49-54, 2012.
"""
engzee_fake_delay = 0
diff = np.zeros(len(signal))
for i in range(4, len(diff)):
diff[i] = signal[i] - signal[i - 4]
ci = [1, 4, 6, 4, 1]
low_pass = scipy.signal.lfilter(ci, 1, diff)
low_pass[: int(0.2 * sampling_rate)] = 0
ms200 = int(0.2 * sampling_rate)
ms1200 = int(1.2 * sampling_rate)
ms160 = int(0.16 * sampling_rate)
neg_threshold = int(0.01 * sampling_rate)
M = 0
M_list = []
neg_m = []
MM = []
M_slope = np.linspace(1.0, 0.6, ms1200 - ms200)
QRS = []
r_peaks = []
counter = 0
thi_list = []
thi = False
thf_list = []
thf = False
for i in range(len(low_pass)): # pylint: disable=C0200
# M
if i < 5 * sampling_rate:
M = 0.6 * np.max(low_pass[: i + 1])
MM.append(M)
if len(MM) > 5:
MM.pop(0)
elif QRS and i < QRS[-1] + ms200:
newM5 = 0.6 * np.max(low_pass[QRS[-1] : i])
if newM5 > 1.5 * MM[-1]:
newM5 = 1.1 * MM[-1]
elif QRS and i == QRS[-1] + ms200:
MM.append(newM5)
if len(MM) > 5:
MM.pop(0)
M = np.mean(MM)
elif QRS and i > QRS[-1] + ms200 and i < QRS[-1] + ms1200:
M = np.mean(MM) * M_slope[i - (QRS[-1] + ms200)]
elif QRS and i > QRS[-1] + ms1200:
M = 0.6 * np.mean(MM)
M_list.append(M)
neg_m.append(-M)
if not QRS and low_pass[i] > M:
QRS.append(i)
thi_list.append(i)
thi = True
elif QRS and i > QRS[-1] + ms200 and low_pass[i] > M:
QRS.append(i)
thi_list.append(i)
thi = True
if thi and i < thi_list[-1] + ms160:
if low_pass[i] < -M and low_pass[i - 1] > -M:
# thf_list.append(i)
thf = True
if thf and low_pass[i] < -M:
thf_list.append(i)
counter += 1
elif low_pass[i] > -M and thf:
counter = 0
thi = False
thf = False
elif thi and i > thi_list[-1] + ms160:
counter = 0
thi = False
thf = False
if counter > neg_threshold:
unfiltered_section = signal[thi_list[-1] - int(0.01 * sampling_rate) : i]
r_peaks.append(engzee_fake_delay + np.argmax(unfiltered_section) + thi_list[-1] - int(0.01 * sampling_rate))
counter = 0
thi = False
thf = False
r_peaks = np.array(r_peaks, dtype="int")
return r_peaks
# =============================================================================
# Stationary Wavelet Transform (SWT) - Kalidas and Tamil (2017)
# =============================================================================
def _ecg_findpeaks_kalidas(signal, sampling_rate=1000):
"""From https://github.com/berndporr/py-ecg-detectors/
- Vignesh Kalidas and Lakshman Tamil (2017). Real-time QRS detector using Stationary Wavelet Transform
for Automated ECG Analysis. In: 2017 IEEE 17th International Conference on Bioinformatics and
Bioengineering (BIBE). Uses the Pan and Tompkins thresolding.
"""
# Try loading pywt
try:
import pywt
except ImportError:
raise ImportError(
"NeuroKit error: ecg_findpeaks(): the 'PyWavelets' module is required for"
" this method to run. Please install it first (`pip install PyWavelets`)."
)
swt_level = 3
padding = -1
for i in range(1000):
if (len(signal) + i) % 2 ** swt_level == 0:
padding = i
break
if padding > 0:
signal = np.pad(signal, (0, padding), "edge")
elif padding == -1:
print("Padding greater than 1000 required\n")
swt_ecg = pywt.swt(signal, "db3", level=swt_level)
swt_ecg = np.array(swt_ecg)
swt_ecg = swt_ecg[0, 1, :]
squared = swt_ecg * swt_ecg
f1 = 0.01 / sampling_rate
f2 = 10 / sampling_rate
b, a = scipy.signal.butter(3, [f1 * 2, f2 * 2], btype="bandpass")
filtered_squared = scipy.signal.lfilter(b, a, squared)
filt_peaks = _ecg_findpeaks_peakdetect(filtered_squared, sampling_rate)
filt_peaks = np.array(filt_peaks, dtype="int")
return filt_peaks
# =============================================================================
# Elgendi et al. (2010)
# =============================================================================
def _ecg_findpeaks_elgendi(signal, sampling_rate=1000):
"""From https://github.com/berndporr/py-ecg-detectors/
- Elgendi, Mohamed & Jonkman, Mirjam & De Boer, Friso. (2010). Frequency Bands Effects on QRS Detection.
The 3rd International Conference on Bio-inspired Systems and Signal Processing (BIOSIGNALS2010).
428-431.
"""
window1 = int(0.12 * sampling_rate)
mwa_qrs = _ecg_findpeaks_MWA(abs(signal), window1)
window2 = int(0.6 * sampling_rate)
mwa_beat = _ecg_findpeaks_MWA(abs(signal), window2)
blocks = np.zeros(len(signal))
block_height = np.max(signal)
for i in range(len(mwa_qrs)): # pylint: disable=C0200
blocks[i] = block_height if mwa_qrs[i] > mwa_beat[i] else 0
QRS = []
for i in range(1, len(blocks)):
if blocks[i - 1] == 0 and blocks[i] == block_height:
start = i
elif blocks[i - 1] == block_height and blocks[i] == 0:
end = i - 1
if end - start > int(0.08 * sampling_rate):
detection = np.argmax(signal[start : end + 1]) + start
if QRS:
if detection - QRS[-1] > int(0.3 * sampling_rate):
QRS.append(detection)
else:
QRS.append(detection)
QRS = np.array(QRS, dtype="int")
return QRS
# =============================================================================
# Continuous Wavelet Transform (CWT) - Martinez et al. (2003)
# =============================================================================
#
def _ecg_findpeaks_WT(signal, sampling_rate=1000):
# Try loading pywt
try:
import pywt
except ImportError:
raise ImportError(
"NeuroKit error: ecg_delineator(): the 'PyWavelets' module is required for"
" this method to run. Please install it first (`pip install PyWavelets`)."
)
# first derivative of the Gaissian signal
scales = np.array([1, 2, 4, 8, 16])
cwtmatr, __ = pywt.cwt(signal, scales, "gaus1", sampling_period=1.0 / sampling_rate)
# For wt of scale 2^4
signal_4 = cwtmatr[4, :]
epsilon_4 = np.sqrt(np.mean(np.square(signal_4)))
peaks_4, _ = scipy.signal.find_peaks(np.abs(signal_4), height=epsilon_4)
# For wt of scale 2^3
signal_3 = cwtmatr[3, :]
epsilon_3 = np.sqrt(np.mean(np.square(signal_3)))
peaks_3, _ = scipy.signal.find_peaks(np.abs(signal_3), height=epsilon_3)
# Keep only peaks_3 that are nearest to peaks_4
peaks_3_keep = np.zeros_like(peaks_4)
for i in range(len(peaks_4)): # pylint: disable=C0200
peaks_distance = abs(peaks_4[i] - peaks_3)
peaks_3_keep[i] = peaks_3[np.argmin(peaks_distance)]
# For wt of scale 2^2
signal_2 = cwtmatr[2, :]
epsilon_2 = np.sqrt(np.mean(np.square(signal_2)))
peaks_2, _ = scipy.signal.find_peaks(np.abs(signal_2), height=epsilon_2)
# Keep only peaks_2 that are nearest to peaks_3
peaks_2_keep = np.zeros_like(peaks_4)
for i in range(len(peaks_4)):
peaks_distance = abs(peaks_3_keep[i] - peaks_2)
peaks_2_keep[i] = peaks_2[np.argmin(peaks_distance)]
# For wt of scale 2^1
signal_1 = cwtmatr[1, :]
epsilon_1 = np.sqrt(np.mean(np.square(signal_1)))
peaks_1, _ = scipy.signal.find_peaks(np.abs(signal_1), height=epsilon_1)
# Keep only peaks_1 that are nearest to peaks_2
peaks_1_keep = np.zeros_like(peaks_4)
for i in range(len(peaks_4)):
peaks_distance = abs(peaks_2_keep[i] - peaks_1)
peaks_1_keep[i] = peaks_1[np.argmin(peaks_distance)]
# Find R peaks
max_R_peak_dist = int(0.1 * sampling_rate)
rpeaks = []
for index_cur, index_next in zip(peaks_1_keep[:-1], peaks_1_keep[1:]):
correct_sign = signal_1[index_cur] < 0 and signal_1[index_next] > 0 # pylint: disable=R1716
near = (index_next - index_cur) < max_R_peak_dist # limit 2
if near and correct_sign:
rpeaks.append(signal_zerocrossings(signal_1[index_cur:index_next])[0] + index_cur)
rpeaks = np.array(rpeaks, dtype="int")
return rpeaks
# =============================================================================
# ASI (FSM based 2020)
# =============================================================================
def _ecg_findpeaks_rodrigues(signal, sampling_rate=1000):
"""Segmenter by Tiago Rodrigues, inspired by on Gutierrez-Rivas (2015) and Sadhukhan (2012).
References
----------
- Gutiérrez-Rivas, R., García, J. J., Marnane, W. P., & Hernández, A. (2015). Novel real-time
low-complexity QRS complex detector based on adaptive thresholding. IEEE Sensors Journal,
15(10), 6036-6043.
- Sadhukhan, D., & Mitra, M. (2012). R-peak detection algorithm for ECG using double difference
and RR interval processing. Procedia Technology, 4, 873-877.
"""
N = int(np.round(3 * sampling_rate / 128))
Nd = N - 1
Pth = (0.7 * sampling_rate) / 128 + 2.7
# Pth = 3, optimal for fs = 250 Hz
Rmin = 0.26
rpeaks = []
i = 1
tf = len(signal)
Ramptotal = 0
# Double derivative squared
diff_ecg = [signal[i] - signal[i - Nd] for i in range(Nd, len(signal))]
ddiff_ecg = [diff_ecg[i] - diff_ecg[i - 1] for i in range(1, len(diff_ecg))]
squar = np.square(ddiff_ecg)
# Integrate moving window
b = np.array(np.ones(N))
a = [1]
processed_ecg = scipy.signal.lfilter(b, a, squar)
# R-peak finder FSM
while i < tf - sampling_rate: # ignore last second of recording
# State 1: looking for maximum
tf1 = np.round(i + Rmin * sampling_rate)
Rpeakamp = 0
while i < tf1:
# Rpeak amplitude and position
if processed_ecg[i] > Rpeakamp:
Rpeakamp = processed_ecg[i]
rpeakpos = i + 1
i += 1
Ramptotal = (19 / 20) * Ramptotal + (1 / 20) * Rpeakamp
rpeaks.append(rpeakpos)
# State 2: waiting state
d = tf1 - rpeakpos
tf2 = i + np.round(0.2 * 2 - d)
while i <= tf2:
i += 1
# State 3: decreasing threshold
Thr = Ramptotal
while processed_ecg[i] < Thr:
Thr *= np.exp(-Pth / sampling_rate)
i += 1
return rpeaks
# =============================================================================
# Utilities
# =============================================================================
def _ecg_findpeaks_MWA(signal, window_size):
"""From https://github.com/berndporr/py-ecg-detectors/"""
mwa = np.zeros(len(signal))
sums = np.cumsum(signal)
def get_mean(begin, end):
if begin == 0:
return sums[end - 1] / end
dif = sums[end - 1] - sums[begin - 1]
return dif / (end - begin)
for i in range(len(signal)): # pylint: disable=C0200
if i < window_size:
section = signal[0:i]
else:
section = get_mean(i - window_size, i)
if i != 0:
mwa[i] = np.mean(section)
else:
mwa[i] = signal[i]
return mwa
def _ecg_findpeaks_peakdetect(detection, sampling_rate=1000):
"""From https://github.com/berndporr/py-ecg-detectors/"""
min_distance = int(0.25 * sampling_rate)
signal_peaks = [0]
noise_peaks = []
SPKI = 0.0
NPKI = 0.0
threshold_I1 = 0.0
threshold_I2 = 0.0
RR_missed = 0
index = 0
indexes = []
missed_peaks = []
peaks = []
for i in range(len(detection)): # pylint: disable=R1702,C0200
# pylint: disable=R1716
if i > 0 and i < len(detection) - 1 and detection[i - 1] < detection[i] and detection[i + 1] < detection[i]:
peak = i
peaks.append(peak) # pylint: disable=R1716
if detection[peak] > threshold_I1 and (peak - signal_peaks[-1]) > 0.3 * sampling_rate:
signal_peaks.append(peak)
indexes.append(index)
SPKI = 0.125 * detection[signal_peaks[-1]] + 0.875 * SPKI
if RR_missed != 0 and signal_peaks[-1] - signal_peaks[-2] > RR_missed:
missed_section_peaks = peaks[indexes[-2] + 1 : indexes[-1]]
missed_section_peaks2 = []
for missed_peak in missed_section_peaks:
if missed_peak - signal_peaks[-2] > min_distance:
if signal_peaks[-1] - missed_peak > min_distance:
if detection[missed_peak] > threshold_I2:
missed_section_peaks2.append(missed_peak)
if missed_section_peaks2:
missed_peak = missed_section_peaks2[np.argmax(detection[missed_section_peaks2])]
missed_peaks.append(missed_peak)
signal_peaks.append(signal_peaks[-1])
signal_peaks[-2] = missed_peak
else:
noise_peaks.append(peak)
NPKI = 0.125 * detection[noise_peaks[-1]] + 0.875 * NPKI
threshold_I1 = NPKI + 0.25 * (SPKI - NPKI)
threshold_I2 = 0.5 * threshold_I1
if len(signal_peaks) > 8:
RR = np.diff(signal_peaks[-9:])
RR_ave = int(np.mean(RR))
RR_missed = int(1.66 * RR_ave)
index += 1
signal_peaks.pop(0)
return signal_peaks