Source code for neurokit2.ecg.ecg_clean

# -*- coding: utf-8 -*-
import numpy as np
import scipy.signal

from ..misc import as_vector
from ..signal import signal_filter


[docs]def ecg_clean(ecg_signal, sampling_rate=1000, method="neurokit"): """Clean an ECG signal. Prepare a raw ECG signal for R-peak detection with the specified method. Parameters ---------- ecg_signal : Union[list, np.array, pd.Series] The raw ECG channel. sampling_rate : int The sampling frequency of `ecg_signal` (in Hz, i.e., samples/second). Defaults to 1000. method : str The processing pipeline to apply. Can be one of 'neurokit' (default), 'biosppy', 'pamtompkins1985', 'hamilton2002', 'elgendi2010', 'engzeemod2012'. Returns ------- array Vector containing the cleaned ECG signal. See Also -------- ecg_findpeaks, signal_rate, ecg_process, ecg_plot Examples -------- >>> import pandas as pd >>> import neurokit2 as nk >>> import matplotlib.pyplot as plt >>> >>> ecg = nk.ecg_simulate(duration=10, sampling_rate=1000) >>> signals = pd.DataFrame({"ECG_Raw" : ecg, ... "ECG_NeuroKit" : nk.ecg_clean(ecg, sampling_rate=1000, method="neurokit"), ... "ECG_BioSPPy" : nk.ecg_clean(ecg, sampling_rate=1000, method="biosppy"), ... "ECG_PanTompkins" : nk.ecg_clean(ecg, sampling_rate=1000, method="pantompkins1985"), ... "ECG_Hamilton" : nk.ecg_clean(ecg, sampling_rate=1000, method="hamilton2002"), ... "ECG_Elgendi" : nk.ecg_clean(ecg, sampling_rate=1000, method="elgendi2010"), ... "ECG_EngZeeMod" : nk.ecg_clean(ecg, sampling_rate=1000, method="engzeemod2012")}) >>> signals.plot() #doctest: +ELLIPSIS <matplotlib.axes._subplots.AxesSubplot object at ...> References -------------- - Jiapu Pan and Willis J. Tompkins. A Real-Time QRS Detection Algorithm. In: IEEE Transactions on Biomedical Engineering BME-32.3 (1985), pp. 230–236. - Hamilton, Open Source ECG Analysis Software Documentation, E.P.Limited, 2002. """ ecg_signal = as_vector(ecg_signal) method = method.lower() # remove capitalised letters if method in ["nk", "nk2", "neurokit", "neurokit2"]: clean = _ecg_clean_nk(ecg_signal, sampling_rate) elif method in ["biosppy", "gamboa2008"]: clean = _ecg_clean_biosppy(ecg_signal, sampling_rate) elif method in ["pantompkins", "pantompkins1985"]: clean = _ecg_clean_pantompkins(ecg_signal, sampling_rate) elif method in ["hamilton", "hamilton2002"]: clean = _ecg_clean_hamilton(ecg_signal, sampling_rate) elif method in ["elgendi", "elgendi2010"]: clean = _ecg_clean_elgendi(ecg_signal, sampling_rate) elif method in ["engzee", "engzee2012", "engzeemod", "engzeemod2012"]: clean = _ecg_clean_engzee(ecg_signal, sampling_rate) elif method in [ "christov", "christov2004", "ssf", "slopesumfunction", "zong", "zong2003", "kalidas2017", "swt", "kalidas", "kalidastamil", "kalidastamil2017", ]: clean = ecg_signal else: raise ValueError( "NeuroKit error: ecg_clean(): 'method' should be " "one of 'neurokit', 'biosppy', 'pamtompkins1985'," " 'hamilton2002', 'elgendi2010', 'engzeemod2012'." ) return clean
# ============================================================================= # Neurokit # ============================================================================= def _ecg_clean_nk(ecg_signal, sampling_rate=1000): # Remove slow drift and dc offset with highpass Butterworth. clean = signal_filter(signal=ecg_signal, sampling_rate=sampling_rate, lowcut=0.5, method="butterworth", order=5) clean = signal_filter(signal=clean, sampling_rate=sampling_rate, method="powerline", powerline=50) return clean # ============================================================================= # Biosppy # ============================================================================= def _ecg_clean_biosppy(ecg_signal, sampling_rate=1000): """Adapted from https://github.com/PIA- Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69.""" order = int(0.3 * sampling_rate) if order % 2 == 0: order += 1 # Enforce odd number # -> filter_signal() frequency = [3, 45] # -> get_filter() # -> _norm_freq() frequency = 2 * np.array(frequency) / sampling_rate # Normalize frequency to Nyquist Frequency (Fs/2). # -> get coeffs a = np.array([1]) b = scipy.signal.firwin(numtaps=order, cutoff=frequency, pass_zero=False) # _filter_signal() filtered = scipy.signal.filtfilt(b, a, ecg_signal) return filtered # ============================================================================= # Pan & Tompkins (1985) # ============================================================================= def _ecg_clean_pantompkins(ecg_signal, sampling_rate=1000): """Adapted from https://github.com/PIA- Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69.""" f1 = 5 / sampling_rate f2 = 15 / sampling_rate order = 1 b, a = scipy.signal.butter(order, [f1 * 2, f2 * 2], btype="bandpass") return scipy.signal.lfilter(b, a, ecg_signal) # Return filtered # ============================================================================= # Elgendi et al. (2010) # ============================================================================= def _ecg_clean_elgendi(ecg_signal, sampling_rate=1000): """From https://github.com/berndporr/py-ecg-detectors/ - Elgendi, Mohamed & Jonkman, Mirjam & De Boer, Friso. (2010). Frequency Bands Effects on QRS Detection. The 3rd International Conference on Bio-inspired Systems and Signal Processing (BIOSIGNALS2010). 428-431. """ f1 = 8 / sampling_rate f2 = 20 / sampling_rate b, a = scipy.signal.butter(2, [f1 * 2, f2 * 2], btype="bandpass") return scipy.signal.lfilter(b, a, ecg_signal) # Return filtered # ============================================================================= # Hamilton (2002) # ============================================================================= def _ecg_clean_hamilton(ecg_signal, sampling_rate=1000): """Adapted from https://github.com/PIA- Group/BioSPPy/blob/e65da30f6379852ecb98f8e2e0c9b4b5175416c3/biosppy/signals/ecg.py#L69.""" f1 = 8 / sampling_rate f2 = 16 / sampling_rate b, a = scipy.signal.butter(1, [f1 * 2, f2 * 2], btype="bandpass") return scipy.signal.lfilter(b, a, ecg_signal) # Return filtered # ============================================================================= # Engzee Modified (2012) # ============================================================================= def _ecg_clean_engzee(ecg_signal, sampling_rate=1000): """From https://github.com/berndporr/py-ecg-detectors/ - C. Zeelenberg, A single scan algorithm for QRS detection and feature extraction, IEEE Comp. in Cardiology, vol. 6, pp. 37-42, 1979. - A. Lourenco, H. Silva, P. Leite, R. Lourenco and A. Fred, "Real Time Electrocardiogram Segmentation for Finger Based ECG Biometrics", BIOSIGNALS 2012, pp. 49-54, 2012. """ f1 = 48 / sampling_rate f2 = 52 / sampling_rate b, a = scipy.signal.butter(4, [f1 * 2, f2 * 2], btype="bandstop") return scipy.signal.lfilter(b, a, ecg_signal) # Return filtered