Source code for neurokit2.ecg.ecg_segment

# - * - coding: utf-8 - * -
import matplotlib.pyplot as plt
import numpy as np

from ..epochs import epochs_create, epochs_to_df
from ..signal import signal_rate
from .ecg_peaks import ecg_peaks


[docs]def ecg_segment(ecg_cleaned, rpeaks=None, sampling_rate=1000, show=False): """Segment an ECG signal into single heartbeats. Parameters ---------- ecg_cleaned : Union[list, np.array, pd.Series] The cleaned ECG channel as returned by `ecg_clean()`. rpeaks : dict The samples at which the R-peaks occur. Dict returned by `ecg_peaks()`. Defaults to None. sampling_rate : int The sampling frequency of `ecg_signal` (in Hz, i.e., samples/second). Defaults to 1000. show : bool If True, will return a plot of heartbeats. Defaults to False. Returns ------- dict A dict containing DataFrames for all segmented heartbeats. See Also -------- ecg_clean, ecg_plot Examples -------- >>> import neurokit2 as nk >>> >>> ecg = nk.ecg_simulate(duration=15, sampling_rate=1000, heart_rate=80) >>> ecg_cleaned = nk.ecg_clean(ecg, sampling_rate=1000) >>> nk.ecg_segment(ecg_cleaned, rpeaks=None, sampling_rate=1000, show=True) #doctest: +ELLIPSIS {'1': Signal Index Label ... '2': Signal Index Label ... '19': Signal Index Label ...} """ # Sanitize inputs if rpeaks is None: _, rpeaks = ecg_peaks(ecg_cleaned, sampling_rate=sampling_rate, correct_artifacts=True) rpeaks = rpeaks["ECG_R_Peaks"] epochs_start, epochs_end = _ecg_segment_window( rpeaks=rpeaks, sampling_rate=sampling_rate, desired_length=len(ecg_cleaned) ) heartbeats = epochs_create( ecg_cleaned, rpeaks, sampling_rate=sampling_rate, epochs_start=epochs_start, epochs_end=epochs_end ) if show: heartbeats_plot = epochs_to_df(heartbeats) heartbeats_pivoted = heartbeats_plot.pivot(index="Time", columns="Label", values="Signal") plt.plot(heartbeats_pivoted) plt.xlabel("Time (s)") plt.title("Individual Heart Beats") cmap = iter( plt.cm.YlOrRd(np.linspace(0, 1, num=int(heartbeats_plot["Label"].nunique()))) ) # pylint: disable=no-member lines = [] for x, color in zip(heartbeats_pivoted, cmap): (line,) = plt.plot(heartbeats_pivoted[x], color=color) lines.append(line) return heartbeats
def _ecg_segment_window(heart_rate=None, rpeaks=None, sampling_rate=1000, desired_length=None): # Extract heart rate if heart_rate is not None: heart_rate = np.mean(heart_rate) if rpeaks is not None: heart_rate = np.mean(signal_rate(rpeaks, sampling_rate=sampling_rate, desired_length=desired_length)) # Modulator m = heart_rate / 60 # Window epochs_start = -0.35 / m epochs_end = 0.5 / m # Adjust for high heart rates if heart_rate >= 80: c = 0.1 epochs_start = epochs_start - c epochs_end = epochs_end + c return epochs_start, epochs_end