# -*- coding: utf-8 -*-
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.signal
from ..epochs import epochs_create, epochs_to_df
from ..signal import signal_findpeaks, signal_formatpeaks, signal_resample, signal_smooth, signal_zerocrossings
from ..stats import standardize
from .ecg_peaks import ecg_peaks
from .ecg_segment import ecg_segment
[docs]def ecg_delineate(
ecg_cleaned, rpeaks=None, sampling_rate=1000, method="peak", show=False, show_type="peaks", check=False
):
"""Delineate QRS complex.
Function to delineate the QRS complex.
- **Cardiac Cycle**: A typical ECG heartbeat consists of a P wave, a QRS complex and a T wave.
The P wave represents the wave of depolarization that spreads from the SA-node throughout the atria.
The QRS complex reflects the rapid depolarization of the right and left ventricles. Since the
ventricles are the largest part of the heart, in terms of mass, the QRS complex usually has a much
larger amplitude than the P-wave. The T wave represents the ventricular repolarization of the
ventricles.On rare occasions, a U wave can be seen following the T wave. The U wave is believed
to be related to the last remnants of ventricular repolarization.
Parameters
----------
ecg_cleaned : Union[list, np.array, pd.Series]
The cleaned ECG channel as returned by `ecg_clean()`.
rpeaks : Union[list, np.array, pd.Series]
The samples at which R-peaks occur. Accessible with the key "ECG_R_Peaks" in the info dictionary
returned by `ecg_findpeaks()`.
sampling_rate : int
The sampling frequency of `ecg_signal` (in Hz, i.e., samples/second).
Defaults to 500.
method : str
Can be one of 'peak' (default) for a peak-based method, 'cwt' for continuous wavelet transform
or 'dwt' for discrete wavelet transform.
show : bool
If True, will return a plot to visualizing the delineated waves
information.
show_type: str
The type of delineated waves information showed in the plot.
check : bool
Defaults to False.
Returns
-------
waves : dict
A dictionary containing additional information.
For derivative method, the dictionary contains the samples at which P-peaks, Q-peaks, S-peaks,
T-peaks, P-onsets and T-offsets occur, accessible with the key "ECG_P_Peaks", "ECG_Q_Peaks",
"ECG_S_Peaks", "ECG_T_Peaks", "ECG_P_Onsets", "ECG_T_Offsets" respectively.
For wavelet methods, the dictionary contains the samples at which P-peaks, T-peaks, P-onsets,
P-offsets, T-onsets, T-offsets, QRS-onsets and QRS-offsets occur, accessible with the key
"ECG_P_Peaks", "ECG_T_Peaks", "ECG_P_Onsets", "ECG_P_Offsets", "ECG_T_Onsets", "ECG_T_Offsets",
"ECG_R_Onsets", "ECG_R_Offsets" respectively.
signals : DataFrame
A DataFrame of same length as the input signal in which occurences of
peaks, onsets and offsets marked as "1" in a list of zeros.
See Also
--------
ecg_clean, signal_fixpeaks, ecg_peaks, signal_rate, ecg_process, ecg_plot
Examples
--------
>>> import neurokit2 as nk
>>>
>>> ecg = nk.ecg_simulate(duration=10, sampling_rate=1000)
>>> cleaned = nk.ecg_clean(ecg, sampling_rate=1000)
>>> _, rpeaks = nk.ecg_peaks(cleaned)
>>> signals, waves = nk.ecg_delineate(cleaned, rpeaks, sampling_rate=1000, method="peak")
>>> nk.events_plot(waves["ECG_P_Peaks"], cleaned) #doctest: +ELLIPSIS
<Figure ...>
>>> nk.events_plot(waves["ECG_T_Peaks"], cleaned) #doctest: +ELLIPSIS
<Figure ...>
References
--------------
- Martínez, J. P., Almeida, R., Olmos, S., Rocha, A. P., & Laguna, P. (2004). A wavelet-based ECG
delineator: evaluation on standard databases. IEEE Transactions on biomedical engineering,
51(4), 570-581.
"""
# Sanitize input for ecg_cleaned
if isinstance(ecg_cleaned, pd.DataFrame):
cols = [col for col in ecg_cleaned.columns if "ECG_Clean" in col]
if cols:
ecg_cleaned = ecg_cleaned[cols[0]].values
else:
raise ValueError("NeuroKit error: ecg_delineate(): Wrong input, we couldn't extract" "cleaned signal.")
elif isinstance(ecg_cleaned, dict):
for i in ecg_cleaned:
cols = [col for col in ecg_cleaned[i].columns if "ECG_Clean" in col]
if cols:
signals = epochs_to_df(ecg_cleaned)
ecg_cleaned = signals[cols[0]].values
else:
raise ValueError("NeuroKit error: ecg_delineate(): Wrong input, we couldn't extract" "cleaned signal.")
# Sanitize input for rpeaks
if rpeaks is None:
_, rpeaks = ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate)
rpeaks = rpeaks["ECG_R_Peaks"]
if isinstance(rpeaks, dict):
rpeaks = rpeaks["ECG_R_Peaks"]
method = method.lower() # remove capitalised letters
if method in ["peak", "peaks", "derivative", "gradient"]:
waves = _ecg_delineator_peak(ecg_cleaned, rpeaks=rpeaks, sampling_rate=sampling_rate)
elif method in ["cwt", "continuous wavelet transform"]:
waves = _ecg_delineator_cwt(ecg_cleaned, rpeaks=rpeaks, sampling_rate=sampling_rate)
elif method in ["dwt", "discrete wavelet transform"]:
waves = _dwt_ecg_delineator(ecg_cleaned, rpeaks, sampling_rate=sampling_rate)
else:
raise ValueError("NeuroKit error: ecg_delineate(): 'method' should be one of 'peak'," "'cwt' or 'dwt'.")
# Remove NaN in Peaks, Onsets, and Offsets
waves_noNA = waves.copy()
for feature in waves_noNA.keys():
waves_noNA[feature] = [int(x) for x in waves_noNA[feature] if ~np.isnan(x)]
instant_peaks = signal_formatpeaks(waves_noNA, desired_length=len(ecg_cleaned))
signals = instant_peaks
if show is True:
_ecg_delineate_plot(
ecg_cleaned, rpeaks=rpeaks, signals=signals, signal_features_type=show_type, sampling_rate=sampling_rate
)
if check is True:
waves = _ecg_delineate_check(waves, rpeaks)
return signals, waves
# =============================================================================
# WAVELET METHOD (DWT)
# =============================================================================
def _dwt_resample_points(peaks, sampling_rate, desired_sampling_rate):
"""Resample given points to a different sampling rate."""
peaks_resample = np.array(peaks) * desired_sampling_rate / sampling_rate
peaks_resample = [np.nan if np.isnan(x) else int(x) for x in peaks_resample.tolist()]
return peaks_resample
def _dwt_ecg_delineator(ecg, rpeaks, sampling_rate, analysis_sampling_rate=2000):
"""Delinate ecg signal using discrete wavelet transforms.
Parameters
----------
ecg : Union[list, np.array, pd.Series]
The cleaned ECG channel as returned by `ecg_clean()`.
rpeaks : Union[list, np.array, pd.Series]
The samples at which R-peaks occur. Accessible with the key "ECG_R_Peaks" in the info dictionary
returned by `ecg_findpeaks()`.
sampling_rate : int
The sampling frequency of `ecg_signal` (in Hz, i.e., samples/second).
analysis_sampling_rate : int
The sampling frequency for analysis (in Hz, i.e., samples/second).
Returns
--------
dict
Dictionary of the points.
"""
ecg = signal_resample(ecg, sampling_rate=sampling_rate, desired_sampling_rate=analysis_sampling_rate)
dwtmatr = _dwt_compute_multiscales(ecg, 9)
# # only for debugging
# for idx in [0, 1, 2, 3]:
# plt.plot(dwtmatr[idx + 3], label=f'W[{idx}]')
# plt.plot(ecg, '--')
# plt.legend()
# plt.grid(True)
# plt.show()
rpeaks_resampled = _dwt_resample_points(rpeaks, sampling_rate, analysis_sampling_rate)
tpeaks, ppeaks = _dwt_delineate_tp_peaks(ecg, rpeaks_resampled, dwtmatr, sampling_rate=analysis_sampling_rate)
qrs_onsets, qrs_offsets = _dwt_delineate_qrs_bounds(
rpeaks_resampled, dwtmatr, ppeaks, tpeaks, sampling_rate=analysis_sampling_rate
)
ponsets, poffsets = _dwt_delineate_tp_onsets_offsets(ppeaks, dwtmatr, sampling_rate=analysis_sampling_rate)
tonsets, toffsets = _dwt_delineate_tp_onsets_offsets(
tpeaks, dwtmatr, sampling_rate=analysis_sampling_rate, onset_weight=0.6, duration=0.6
)
return dict(
ECG_T_Peaks=_dwt_resample_points(tpeaks, analysis_sampling_rate, desired_sampling_rate=sampling_rate),
ECG_T_Onsets=_dwt_resample_points(tonsets, analysis_sampling_rate, desired_sampling_rate=sampling_rate),
ECG_T_Offsets=_dwt_resample_points(toffsets, analysis_sampling_rate, desired_sampling_rate=sampling_rate),
ECG_P_Peaks=_dwt_resample_points(ppeaks, analysis_sampling_rate, desired_sampling_rate=sampling_rate),
ECG_P_Onsets=_dwt_resample_points(ponsets, analysis_sampling_rate, desired_sampling_rate=sampling_rate),
ECG_P_Offsets=_dwt_resample_points(poffsets, analysis_sampling_rate, desired_sampling_rate=sampling_rate),
ECG_R_Onsets=_dwt_resample_points(qrs_onsets, analysis_sampling_rate, desired_sampling_rate=sampling_rate),
ECG_R_Offsets=_dwt_resample_points(qrs_offsets, analysis_sampling_rate, desired_sampling_rate=sampling_rate),
)
def _dwt_compensate_degree(sampling_rate):
return int(np.log2(sampling_rate / 250))
def _dwt_delineate_tp_peaks(
ecg,
rpeaks,
dwtmatr,
sampling_rate=250,
qrs_width=0.13,
p2r_duration=0.2,
rt_duration=0.25,
degree_tpeak=3,
degree_ppeak=2,
epsilon_T_weight=0.25,
epsilon_P_weight=0.02,
):
srch_bndry = int(0.5 * qrs_width * sampling_rate)
degree_add = _dwt_compensate_degree(sampling_rate)
tpeaks = []
for rpeak_ in rpeaks:
if np.isnan(rpeak_):
tpeaks.append(np.nan)
continue
# search for T peaks from R peaks
srch_idx_start = rpeak_ + srch_bndry
srch_idx_end = rpeak_ + 2 * int(rt_duration * sampling_rate)
dwt_local = dwtmatr[degree_tpeak + degree_add, srch_idx_start:srch_idx_end]
height = epsilon_T_weight * np.sqrt(np.mean(np.square(dwt_local)))
if len(dwt_local) == 0:
tpeaks.append(np.nan)
continue
ecg_local = ecg[srch_idx_start:srch_idx_end]
peaks, __ = scipy.signal.find_peaks(np.abs(dwt_local), height=height)
peaks = list(filter(lambda p: np.abs(dwt_local[p]) > 0.025 * max(dwt_local), peaks)) # pylint: disable=W0640
if dwt_local[0] > 0: # just append
peaks = [0] + peaks
# detect morphology
candidate_peaks = []
candidate_peaks_scores = []
for idx_peak, idx_peak_nxt in zip(peaks[:-1], peaks[1:]):
correct_sign = dwt_local[idx_peak] > 0 and dwt_local[idx_peak_nxt] < 0 # pylint: disable=R1716
if correct_sign:
idx_zero = signal_zerocrossings(dwt_local[idx_peak:idx_peak_nxt])[0] + idx_peak
# This is the score assigned to each peak. The peak with the highest score will be
# selected.
score = ecg_local[idx_zero] - (float(idx_zero) / sampling_rate - (rt_duration - 0.5 * qrs_width))
candidate_peaks.append(idx_zero)
candidate_peaks_scores.append(score)
if not candidate_peaks:
tpeaks.append(np.nan)
continue
tpeaks.append(candidate_peaks[np.argmax(candidate_peaks_scores)] + srch_idx_start)
ppeaks = []
for rpeak in rpeaks:
if np.isnan(rpeak):
ppeaks.append(np.nan)
continue
# search for P peaks from Rpeaks
srch_idx_start = rpeak - 2 * int(p2r_duration * sampling_rate)
srch_idx_end = rpeak - srch_bndry
dwt_local = dwtmatr[degree_ppeak + degree_add, srch_idx_start:srch_idx_end]
height = epsilon_P_weight * np.sqrt(np.mean(np.square(dwt_local)))
if len(dwt_local) == 0:
ppeaks.append(np.nan)
continue
ecg_local = ecg[srch_idx_start:srch_idx_end]
peaks, __ = scipy.signal.find_peaks(np.abs(dwt_local), height=height)
peaks = list(filter(lambda p: np.abs(dwt_local[p]) > 0.025 * max(dwt_local), peaks))
if dwt_local[0] > 0: # just append
peaks = [0] + peaks
# detect morphology
candidate_peaks = []
candidate_peaks_scores = []
for idx_peak, idx_peak_nxt in zip(peaks[:-1], peaks[1:]):
correct_sign = dwt_local[idx_peak] > 0 and dwt_local[idx_peak_nxt] < 0 # pylint: disable=R1716
if correct_sign:
idx_zero = signal_zerocrossings(dwt_local[idx_peak:idx_peak_nxt])[0] + idx_peak
# This is the score assigned to each peak. The peak with the highest score will be
# selected.
score = ecg_local[idx_zero] - abs(
float(idx_zero) / sampling_rate - p2r_duration
) # Minus p2r because of the srch_idx_start
candidate_peaks.append(idx_zero)
candidate_peaks_scores.append(score)
if not candidate_peaks:
ppeaks.append(np.nan)
continue
ppeaks.append(candidate_peaks[np.argmax(candidate_peaks_scores)] + srch_idx_start)
return tpeaks, ppeaks
def _dwt_delineate_tp_onsets_offsets(
peaks,
dwtmatr,
sampling_rate=250,
duration=0.3,
duration_offset=0.3,
onset_weight=0.4,
offset_weight=0.4,
degree_onset=2,
degree_offset=2,
):
degree = _dwt_compensate_degree(sampling_rate)
onsets = []
offsets = []
for i in range(len(peaks)): # pylint: disable=C0200
if np.isnan(peaks[i]):
onsets.append(np.nan)
offsets.append(np.nan)
continue
# look for onsets
srch_idx_start = peaks[i] - int(duration * sampling_rate)
srch_idx_end = peaks[i]
if srch_idx_start is np.nan or srch_idx_end is np.nan:
onsets.append(np.nan)
offsets.append(np.nan)
continue
dwt_local = dwtmatr[degree_onset + degree, srch_idx_start:srch_idx_end]
onset_slope_peaks, __ = scipy.signal.find_peaks(dwt_local)
try:
epsilon_onset = onset_weight * dwt_local[onset_slope_peaks[-1]]
candidate_onsets = np.where(dwt_local[: onset_slope_peaks[-1]] < epsilon_onset)[0]
onsets.append(candidate_onsets[-1] + srch_idx_start)
except IndexError:
onsets.append(np.nan)
# # only for debugging
# events_plot([candidate_onsets, onset_slope_peaks], dwt_local)
# plt.plot(ecg[srch_idx_start: srch_idx_end], '--', label='ecg')
# plt.show()
# look for offset
srch_idx_start = peaks[i]
srch_idx_end = peaks[i] + int(duration_offset * sampling_rate)
if srch_idx_start is np.nan or srch_idx_end is np.nan:
offsets.append(np.nan)
continue
dwt_local = dwtmatr[degree_offset + degree, srch_idx_start:srch_idx_end]
offset_slope_peaks, __ = scipy.signal.find_peaks(-dwt_local)
try:
epsilon_offset = -offset_weight * dwt_local[offset_slope_peaks[0]]
candidate_offsets = (
np.where(-dwt_local[offset_slope_peaks[0] :] < epsilon_offset)[0] + offset_slope_peaks[0]
)
offsets.append(candidate_offsets[0] + srch_idx_start)
except IndexError:
offsets.append(np.nan)
# # only for debugging
# events_plot([candidate_offsets, offset_slope_peaks], dwt_local)
# plt.plot(ecg[srch_idx_start: srch_idx_end], '--', label='ecg')
# plt.show()
return onsets, offsets
def _dwt_delineate_qrs_bounds(rpeaks, dwtmatr, ppeaks, tpeaks, sampling_rate=250):
degree = int(np.log2(sampling_rate / 250))
onsets = []
for i in range(len(rpeaks)): # pylint: disable=C0200
# look for onsets
srch_idx_start = ppeaks[i]
srch_idx_end = rpeaks[i]
if srch_idx_start is np.nan or srch_idx_end is np.nan:
onsets.append(np.nan)
continue
dwt_local = dwtmatr[2 + degree, srch_idx_start:srch_idx_end]
onset_slope_peaks, __ = scipy.signal.find_peaks(-dwt_local)
epsilon_onset = 0.5 * -dwt_local[onset_slope_peaks[-1]]
candidate_onsets = np.where(-dwt_local[: onset_slope_peaks[-1]] < epsilon_onset)[0]
onsets.append(candidate_onsets[-1] + srch_idx_start)
# # only for debugging
# events_plot(candidate_onsets, -dwt_local)
# plt.plot(ecg[srch_idx_start: srch_idx_end], '--', label='ecg')
# plt.legend()
# plt.show()
offsets = []
for i in range(len(rpeaks)): # pylint: disable=C0200
# look for offsets
srch_idx_start = rpeaks[i]
srch_idx_end = tpeaks[i]
if srch_idx_start is np.nan or srch_idx_end is np.nan:
offsets.append(np.nan)
continue
dwt_local = dwtmatr[2 + degree, srch_idx_start:srch_idx_end]
onset_slope_peaks, __ = scipy.signal.find_peaks(dwt_local)
if len(onset_slope_peaks) == 0:
offsets.append(np.nan)
continue
epsilon_offset = 0.5 * dwt_local[onset_slope_peaks[0]]
if not (dwt_local[onset_slope_peaks[0] :] < epsilon_offset).any():
offsets.append(np.nan)
continue
candidate_offsets = np.where(dwt_local[onset_slope_peaks[0] :] < epsilon_offset)[0] + onset_slope_peaks[0]
offsets.append(candidate_offsets[0] + srch_idx_start)
return onsets, offsets
def _dwt_compute_multiscales(ecg: np.ndarray, max_degree):
"""Return multiscales wavelet transforms."""
def _apply_H_filter(signal_i, power=0):
zeros = np.zeros(2 ** power - 1)
timedelay = 2 ** power
banks = np.r_[
1.0 / 8, zeros, 3.0 / 8, zeros, 3.0 / 8, zeros, 1.0 / 8,
]
signal_f = scipy.signal.convolve(signal_i, banks, mode="full")
signal_f[:-timedelay] = signal_f[timedelay:] # timeshift: 2 steps
return signal_f
def _apply_G_filter(signal_i, power=0):
zeros = np.zeros(2 ** power - 1)
timedelay = 2 ** power
banks = np.r_[2, zeros, -2]
signal_f = scipy.signal.convolve(signal_i, banks, mode="full")
signal_f[:-timedelay] = signal_f[timedelay:] # timeshift: 1 step
return signal_f
dwtmatr = []
intermediate_ret = np.array(ecg)
for deg in range(max_degree):
S_deg = _apply_G_filter(intermediate_ret, power=deg)
T_deg = _apply_H_filter(intermediate_ret, power=deg)
dwtmatr.append(S_deg)
intermediate_ret = np.array(T_deg)
dwtmatr = [arr[: len(ecg)] for arr in dwtmatr] # rescale transforms to the same length
return np.array(dwtmatr)
# =============================================================================
# WAVELET METHOD (CWT)
# =============================================================================
def _ecg_delineator_cwt(ecg, rpeaks=None, sampling_rate=1000):
# P-Peaks and T-Peaks
tpeaks, ppeaks = _peaks_delineator(ecg, rpeaks, sampling_rate=sampling_rate)
# qrs onsets and offsets
qrs_onsets, qrs_offsets = _onset_offset_delineator(ecg, rpeaks, peak_type="rpeaks", sampling_rate=sampling_rate)
# ppeaks onsets and offsets
p_onsets, p_offsets = _onset_offset_delineator(ecg, ppeaks, peak_type="ppeaks", sampling_rate=sampling_rate)
# tpeaks onsets and offsets
t_onsets, t_offsets = _onset_offset_delineator(ecg, tpeaks, peak_type="tpeaks", sampling_rate=sampling_rate)
# Return info dictionary
return {
"ECG_P_Peaks": ppeaks,
"ECG_T_Peaks": tpeaks,
"ECG_R_Onsets": qrs_onsets,
"ECG_R_Offsets": qrs_offsets,
"ECG_P_Onsets": p_onsets,
"ECG_P_Offsets": p_offsets,
"ECG_T_Onsets": t_onsets,
"ECG_T_Offsets": t_offsets,
}
# Internals
# ---------------------
def _onset_offset_delineator(ecg, peaks, peak_type="rpeaks", sampling_rate=1000):
# Try loading pywt
try:
import pywt
except ImportError:
raise ImportError(
"NeuroKit error: ecg_delineator(): the 'PyWavelets' module is required for this method to run. ",
"Please install it first (`pip install PyWavelets`).",
)
# first derivative of the Gaissian signal
scales = np.array([1, 2, 4, 8, 16])
cwtmatr, __ = pywt.cwt(ecg, scales, "gaus1", sampling_period=1.0 / sampling_rate)
half_wave_width = int(0.1 * sampling_rate) # NEED TO CHECK
onsets = []
offsets = []
for index_peak in peaks:
# find onset
if peak_type == "rpeaks":
search_window = cwtmatr[2, index_peak - half_wave_width : index_peak]
prominence = 0.20 * max(search_window)
height = 0.0
wt_peaks, wt_peaks_data = scipy.signal.find_peaks(search_window, height=height, prominence=prominence)
elif peak_type in ["tpeaks", "ppeaks"]:
search_window = -cwtmatr[4, index_peak - half_wave_width : index_peak]
prominence = 0.10 * max(search_window)
height = 0.0
wt_peaks, wt_peaks_data = scipy.signal.find_peaks(search_window, height=height, prominence=prominence)
if len(wt_peaks) == 0:
# print("Fail to find onset at index: %d", index_peak)
continue
# The last peak is nfirst in (Martinez, 2004)
nfirst = wt_peaks[-1] + index_peak - half_wave_width
if peak_type == "rpeaks":
if wt_peaks_data["peak_heights"][-1] > 0:
epsilon_onset = 0.05 * wt_peaks_data["peak_heights"][-1]
elif peak_type == "ppeaks":
epsilon_onset = 0.50 * wt_peaks_data["peak_heights"][-1]
elif peak_type == "tpeaks":
epsilon_onset = 0.25 * wt_peaks_data["peak_heights"][-1]
leftbase = wt_peaks_data["left_bases"][-1] + index_peak - half_wave_width
if peak_type == "rpeaks":
candidate_onsets = np.where(cwtmatr[2, nfirst - 100 : nfirst] < epsilon_onset)[0] + nfirst - 100
elif peak_type in ["tpeaks", "ppeaks"]:
candidate_onsets = np.where(-cwtmatr[4, nfirst - 100 : nfirst] < epsilon_onset)[0] + nfirst - 100
candidate_onsets = candidate_onsets.tolist() + [leftbase]
if len(candidate_onsets) == 0:
onsets.append(np.nan)
else:
onsets.append(max(candidate_onsets))
# find offset
if peak_type == "rpeaks":
search_window = -cwtmatr[2, index_peak : index_peak + half_wave_width]
prominence = 0.50 * max(search_window)
wt_peaks, wt_peaks_data = scipy.signal.find_peaks(search_window, height=height, prominence=prominence)
elif peak_type in ["tpeaks", "ppeaks"]:
search_window = cwtmatr[4, index_peak : index_peak + half_wave_width]
prominence = 0.10 * max(search_window)
wt_peaks, wt_peaks_data = scipy.signal.find_peaks(search_window, height=height, prominence=prominence)
if len(wt_peaks) == 0:
# print("Fail to find offsets at index: %d", index_peak)
continue
nlast = wt_peaks[0] + index_peak
if peak_type == "rpeaks":
if wt_peaks_data["peak_heights"][0] > 0:
epsilon_offset = 0.125 * wt_peaks_data["peak_heights"][0]
elif peak_type == "ppeaks":
epsilon_offset = 0.9 * wt_peaks_data["peak_heights"][0]
elif peak_type == "tpeaks":
epsilon_offset = 0.4 * wt_peaks_data["peak_heights"][0]
rightbase = wt_peaks_data["right_bases"][0] + index_peak
if peak_type == "rpeaks":
candidate_offsets = np.where((-cwtmatr[2, nlast : nlast + 100]) < epsilon_offset)[0] + nlast
elif peak_type in ["tpeaks", "ppeaks"]:
candidate_offsets = np.where((cwtmatr[4, nlast : nlast + 100]) < epsilon_offset)[0] + nlast
candidate_offsets = candidate_offsets.tolist() + [rightbase]
if len(candidate_offsets) == 0:
offsets.append(np.nan)
else:
offsets.append(min(candidate_offsets))
onsets = np.array(onsets, dtype="int")
offsets = np.array(offsets, dtype="int")
return onsets, offsets
def _peaks_delineator(ecg, rpeaks, sampling_rate=1000):
# Try loading pywt
try:
import pywt
except ImportError:
raise ImportError(
"NeuroKit error: ecg_delineator(): the 'PyWavelets' module is required for this method to run. ",
"Please install it first (`pip install PyWavelets`).",
)
# first derivative of the Gaissian signal
scales = np.array([1, 2, 4, 8, 16])
cwtmatr, __ = pywt.cwt(ecg, scales, "gaus1", sampling_period=1.0 / sampling_rate)
qrs_duration = 0.1
search_boundary = int(0.9 * qrs_duration * sampling_rate / 2)
significant_peaks_groups = []
for i in range(len(rpeaks) - 1):
# search for T peaks and P peaks from R peaks
start = rpeaks[i] + search_boundary
end = rpeaks[i + 1] - search_boundary
search_window = cwtmatr[4, start:end]
height = 0.25 * np.sqrt(np.mean(np.square(search_window)))
peaks_tp, heights_tp = scipy.signal.find_peaks(np.abs(search_window), height=height)
peaks_tp = peaks_tp + rpeaks[i] + search_boundary
# set threshold for heights of peaks to find significant peaks in wavelet
threshold = 0.125 * max(search_window)
significant_index = []
significant_index = [j for j in range(len(peaks_tp)) if heights_tp["peak_heights"][j] > threshold]
significant_peaks_tp = []
for index in significant_index:
significant_peaks_tp.append(peaks_tp[index])
significant_peaks_groups.append(_find_tppeaks(ecg, significant_peaks_tp, sampling_rate=sampling_rate))
tpeaks, ppeaks = zip(*[(g[0], g[-1]) for g in significant_peaks_groups])
tpeaks = np.array(tpeaks, dtype="int")
ppeaks = np.array(ppeaks, dtype="int")
return tpeaks, ppeaks
def _find_tppeaks(ecg, keep_tp, sampling_rate=1000):
# Try loading pywt
try:
import pywt
except ImportError:
raise ImportError(
"NeuroKit error: ecg_delineator(): the 'PyWavelets' module is required for this method to run. ",
"Please install it first (`pip install PyWavelets`).",
)
# first derivative of the Gaissian signal
scales = np.array([1, 2, 4, 8, 16])
cwtmatr, __ = pywt.cwt(ecg, scales, "gaus1", sampling_period=1.0 / sampling_rate)
max_search_duration = 0.05
tppeaks = []
for index_cur, index_next in zip(keep_tp[:-1], keep_tp[1:]):
# limit 1
correct_sign = cwtmatr[4, :][index_cur] < 0 and cwtmatr[4, :][index_next] > 0 # pylint: disable=R1716
# near = (index_next - index_cur) < max_wv_peak_dist #limit 2
# if near and correct_sign:
if correct_sign:
index_zero_cr = signal_zerocrossings(cwtmatr[4, :][index_cur:index_next])[0] + index_cur
nb_idx = int(max_search_duration * sampling_rate)
index_max = np.argmax(ecg[index_zero_cr - nb_idx : index_zero_cr + nb_idx]) + (index_zero_cr - nb_idx)
tppeaks.append(index_max)
return tppeaks
# =============================================================================
# PEAK METHOD
# =============================================================================
def _ecg_delineator_peak(ecg, rpeaks=None, sampling_rate=1000):
# Initialize
heartbeats = ecg_segment(ecg, rpeaks, sampling_rate)
Q_list = []
P_list = []
S_list = []
T_list = []
P_onsets = []
T_offsets = []
for i, rpeak in enumerate(rpeaks):
heartbeat = heartbeats[str(i + 1)]
# Get index of heartbeat
R = heartbeat.index.get_loc(np.min(heartbeat.index.values[heartbeat.index.values > 0]))
# Peaks ------
# Q wave
Q_index, Q = _ecg_delineator_peak_Q(rpeak, heartbeat, R)
Q_list.append(Q_index)
# P wave
P_index, P = _ecg_delineator_peak_P(rpeak, heartbeat, R, Q)
P_list.append(P_index)
# S wave
S_index, S = _ecg_delineator_peak_S(rpeak, heartbeat)
S_list.append(S_index)
# T wave
T_index, T = _ecg_delineator_peak_T(rpeak, heartbeat, R, S)
T_list.append(T_index)
# Onsets/Offsets ------
P_onsets.append(_ecg_delineator_peak_P_onset(rpeak, heartbeat, R, P))
T_offsets.append(_ecg_delineator_peak_T_offset(rpeak, heartbeat, R, T))
# Return info dictionary
return {
"ECG_P_Peaks": P_list,
"ECG_Q_Peaks": Q_list,
"ECG_S_Peaks": S_list,
"ECG_T_Peaks": T_list,
"ECG_P_Onsets": P_onsets,
"ECG_T_Offsets": T_offsets,
}
# Internal
# --------------------------
def _ecg_delineator_peak_Q(rpeak, heartbeat, R):
segment = heartbeat[:0] # Select left hand side
Q = signal_findpeaks(-1 * segment["Signal"], height_min=0.05 * (segment["Signal"].max() - segment["Signal"].min()))
if len(Q["Peaks"]) == 0:
return np.nan, None
Q = Q["Peaks"][-1] # Select most right-hand side
from_R = R - Q # Relative to R
return rpeak - from_R, Q
def _ecg_delineator_peak_P(rpeak, heartbeat, R, Q):
if Q is None:
return np.nan, None
segment = heartbeat.iloc[:Q] # Select left of Q wave
P = signal_findpeaks(segment["Signal"], height_min=0.05 * (segment["Signal"].max() - segment["Signal"].min()))
if len(P["Peaks"]) == 0:
return np.nan, None
P = P["Peaks"][np.argmax(P["Height"])] # Select heighest
from_R = R - P # Relative to R
return rpeak - from_R, P
def _ecg_delineator_peak_S(rpeak, heartbeat):
segment = heartbeat[0:] # Select right hand side
S = signal_findpeaks(-segment["Signal"], height_min=0.05 * (segment["Signal"].max() - segment["Signal"].min()))
if len(S["Peaks"]) == 0:
return np.nan, None
S = S["Peaks"][0] # Select most left-hand side
return rpeak + S, S
def _ecg_delineator_peak_T(rpeak, heartbeat, R, S):
if S is None:
return np.nan, None
segment = heartbeat.iloc[R + S :] # Select right of S wave
T = signal_findpeaks(segment["Signal"], height_min=0.05 * (segment["Signal"].max() - segment["Signal"].min()))
if len(T["Peaks"]) == 0:
return np.nan, None
T = S + T["Peaks"][np.argmax(T["Height"])] # Select heighest
return rpeak + T, T
def _ecg_delineator_peak_P_onset(rpeak, heartbeat, R, P):
if P is None:
return np.nan
segment = heartbeat.iloc[:P] # Select left of P wave
try:
signal = signal_smooth(segment["Signal"].values, size=R / 10)
except TypeError:
signal = segment["Signal"]
if len(signal) < 2:
return np.nan
signal = np.gradient(np.gradient(signal))
P_onset = np.argmax(signal)
from_R = R - P_onset # Relative to R
return rpeak - from_R
def _ecg_delineator_peak_T_offset(rpeak, heartbeat, R, T):
if T is None:
return np.nan
segment = heartbeat.iloc[R + T :] # Select left of P wave
try:
signal = signal_smooth(segment["Signal"].values, size=R / 10)
except TypeError:
signal = segment["Signal"]
if len(signal) < 2:
return np.nan
signal = np.gradient(np.gradient(signal))
T_offset = np.argmax(signal)
return rpeak + T + T_offset
# =============================================================================
# Internals
# =============================================================================
def _ecg_delineate_plot(ecg_signal, rpeaks=None, signals=None, signal_features_type="all", sampling_rate=1000):
"""# Examples.
# --------
# >>> import neurokit2 as nk
# >>> import numpy as np
# >>> import pandas as pd
# >>> import matplotlib.pyplot as plt
#
# >>> ecg_signal = np.array(pd.read_csv(
# "https://raw.githubusercontent.com/neuropsychology/NeuroKit/dev/data/ecg_1000hz.csv"))[:, 1]
#
# >>> # Extract R-peaks locations
# >>> _, rpeaks = nk.ecg_peaks(ecg_signal, sampling_rate=1000)
#
# >>> # Delineate the ECG signal with ecg_delineate()
# >>> signals, waves = nk.ecg_delineate(ecg_signal, rpeaks, sampling_rate=1000)
#
# >>> # Plot the ECG signal with markings on ECG peaks
# >>> _ecg_delineate_plot(ecg_signal, rpeaks=rpeaks, signals=signals,
# signal_features_type='peaks', sampling_rate=1000)
#
# >>> # Plot the ECG signal with markings on boundaries of R peaks
# >>> _ecg_delineate_plot(ecg_signal, rpeaks=rpeaks, signals=signals,
# signal_features_type='bound_R', sampling_rate=1000)
#
# >>> # Plot the ECG signal with markings on boundaries of P peaks
# >>> _ecg_delineate_plot(ecg_signal, rpeaks=rpeaks, signals=signals,
# signal_features_type='bound_P', sampling_rate=1000)
#
# >>> # Plot the ECG signal with markings on boundaries of T peaks
# >>> _ecg_delineate_plot(ecg_signal, rpeaks=rpeaks, signals=signals,
# signal_features_type='bound_T', sampling_rate=1000)
#
# >>> # Plot the ECG signal with markings on all peaks and boundaries
# >>> _ecg_delineate_plot(ecg_signal, rpeaks=rpeaks, signals=signals,
# signal_features_type='all', sampling_rate=1000)
"""
data = pd.DataFrame({"Signal": list(ecg_signal)})
data = pd.concat([data, signals], axis=1)
# Try retrieving right column
if isinstance(rpeaks, dict):
rpeaks = rpeaks["ECG_R_Peaks"]
# Segment the signal around the R-peaks
epochs = epochs_create(data, events=rpeaks, sampling_rate=sampling_rate, epochs_start=-0.35, epochs_end=0.55)
data = epochs_to_df(epochs)
data_cols = data.columns.values
dfs = []
for feature in data_cols:
if signal_features_type == "peaks":
if any(x in str(feature) for x in ["Peak"]):
df = data[feature]
dfs.append(df)
elif signal_features_type == "bounds_R":
if any(x in str(feature) for x in ["ECG_R_Onsets", "ECG_R_Offsets"]):
df = data[feature]
dfs.append(df)
elif signal_features_type == "bounds_T":
if any(x in str(feature) for x in ["ECG_T_Onsets", "ECG_T_Offsets"]):
df = data[feature]
dfs.append(df)
elif signal_features_type == "bounds_P":
if any(x in str(feature) for x in ["ECG_P_Onsets", "ECG_P_Offsets"]):
df = data[feature]
dfs.append(df)
elif signal_features_type == "all":
if any(x in str(feature) for x in ["Peak", "Onset", "Offset"]):
df = data[feature]
dfs.append(df)
features = pd.concat(dfs, axis=1)
fig, ax = plt.subplots()
data.Label = data.Label.astype(int)
for label in data.Label.unique():
epoch_data = data[data.Label == label]
ax.plot(epoch_data.Time, epoch_data.Signal, color="grey", alpha=0.2, label="_nolegend_")
for i, feature_type in enumerate(features.columns.values): # pylint: disable=W0612
event_data = data[data[feature_type] == 1.0]
ax.scatter(event_data.Time, event_data.Signal, label=feature_type, alpha=0.5, s=200)
ax.legend()
return fig
def _ecg_delineate_check(waves, rpeaks):
"""This function replaces the delineated features with np.nan if its standardized distance from R-peaks is more than
3."""
df = pd.DataFrame.from_dict(waves)
features_columns = df.columns
df = pd.concat([df, pd.DataFrame({"ECG_R_Peaks": rpeaks})], axis=1)
# loop through all columns to calculate the z distance
for column in features_columns: # pylint: disable=W0612
df = _calculate_abs_z(df, features_columns)
# Replace with nan if distance > 3
for col in features_columns:
for i in range(len(df)):
if df["Dist_R_" + col][i] > 3:
df[col][i] = np.nan
# Return df without distance columns
df = df[features_columns]
waves = df.to_dict("list")
return waves
def _calculate_abs_z(df, columns):
"""This function helps to calculate the absolute standardized distance between R-peaks and other delineated waves
features by `ecg_delineate()`"""
for column in columns:
df["Dist_R_" + column] = np.abs(standardize(df[column].sub(df["ECG_R_Peaks"], axis=0)))
return df