Source code for neurokit2.ppg.ppg_findpeaks

# -*- coding: utf-8 -*-

import matplotlib.pyplot as plt
import numpy as np
import scipy.signal

from ..signal import signal_smooth


[docs]def ppg_findpeaks(ppg_cleaned, sampling_rate=1000, method="elgendi", show=False): """Find systolic peaks in a photoplethysmogram (PPG) signal. Parameters ---------- ppg_cleaned : Union[list, np.array, pd.Series] The cleaned PPG channel as returned by `ppg_clean()`. sampling_rate : int The sampling frequency of the PPG (in Hz, i.e., samples/second). The default is 1000. method : str The processing pipeline to apply. Can be one of "elgendi". The default is "elgendi". show : bool If True, returns a plot of the thresholds used during peak detection. Useful for debugging. The default is False. Returns ------- info : dict A dictionary containing additional information, in this case the samples at which systolic peaks occur, accessible with the key "PPG_Peaks". See Also -------- ppg_simulate, ppg_clean Examples -------- >>> import neurokit2 as nk >>> import matplotlib.pyplot as plt >>> >>> ppg = nk.ppg_simulate(heart_rate=75, duration=30) >>> ppg_clean = nk.ppg_clean(ppg) >>> info = nk.ppg_findpeaks(ppg_clean) >>> peaks = info["PPG_Peaks"] >>> >>> plt.plot(ppg, label="raw PPG") #doctest: +SKIP >>> plt.plot(ppg_clean, label="clean PPG") #doctest: +SKIP >>> plt.scatter(peaks, ppg[peaks], c="r", label="systolic peaks") #doctest: +SKIP >>> plt.legend() #doctest: +SKIP References ---------- - Elgendi M, Norton I, Brearley M, Abbott D, Schuurmans D (2013) Systolic Peak Detection in Acceleration Photoplethysmograms Measured from Emergency Responders in Tropical Conditions. PLoS ONE 8(10): e76585. doi:10.1371/journal.pone.0076585. """ method = method.lower() if method in ["elgendi"]: peaks = _ppg_findpeaks_elgendi(ppg_cleaned, sampling_rate, show=show) else: raise ValueError("Neurokit error: Please use one of the following" " methods: 'elgendi'.") # Prepare output. info = {"PPG_Peaks": peaks} return info
def _ppg_findpeaks_elgendi( signal, sampling_rate=1000, peakwindow=0.111, beatwindow=0.667, beatoffset=0.02, mindelay=0.3, show=False ): """Implementation of Elgendi M, Norton I, Brearley M, Abbott D, Schuurmans D (2013) Systolic Peak Detection in Acceleration Photoplethysmograms Measured from Emergency Responders in Tropical Conditions. PLoS ONE 8(10): e76585. doi:10.1371/journal.pone.0076585. All tune-able parameters are specified as keyword arguments. `signal` must be the bandpass-filtered raw PPG with a lowcut of .5 Hz, a highcut of 8 Hz. """ if show: __, (ax0, ax1) = plt.subplots(nrows=2, ncols=1, sharex=True) ax0.plot(signal, label="filtered") # Ignore the samples with negative amplitudes and square the samples with # values larger than zero. signal[signal < 0] = 0 sqrd = signal ** 2 # Compute the thresholds for peak detection. Call with show=True in order # to visualize thresholds. ma_peak_kernel = int(np.rint(peakwindow * sampling_rate)) ma_peak = signal_smooth(sqrd, kernel="boxcar", size=ma_peak_kernel) ma_beat_kernel = int(np.rint(beatwindow * sampling_rate)) ma_beat = signal_smooth(sqrd, kernel="boxcar", size=ma_beat_kernel) thr1 = ma_beat + beatoffset * np.mean(sqrd) # threshold 1 if show: ax1.plot(sqrd, label="squared") ax1.plot(thr1, label="threshold") ax1.legend(loc="upper right") # Identify start and end of PPG waves. waves = ma_peak > thr1 beg_waves = np.where(np.logical_and(np.logical_not(waves[0:-1]), waves[1:]))[0] end_waves = np.where(np.logical_and(waves[0:-1], np.logical_not(waves[1:])))[0] # Throw out wave-ends that precede first wave-start. end_waves = end_waves[end_waves > beg_waves[0]] # Identify systolic peaks within waves (ignore waves that are too short). num_waves = min(beg_waves.size, end_waves.size) min_len = int(np.rint(peakwindow * sampling_rate)) # this is threshold 2 in the paper min_delay = int(np.rint(mindelay * sampling_rate)) peaks = [0] for i in range(num_waves): beg = beg_waves[i] end = end_waves[i] len_wave = end - beg if len_wave < min_len: continue # Visualize wave span. if show: ax1.axvspan(beg, end, facecolor="m", alpha=0.5) # Find local maxima and their prominence within wave span. data = signal[beg:end] locmax, props = scipy.signal.find_peaks(data, prominence=(None, None)) if locmax.size > 0: # Identify most prominent local maximum. peak = beg + locmax[np.argmax(props["prominences"])] # Enforce minimum delay between peaks. if peak - peaks[-1] > min_delay: peaks.append(peak) peaks.pop(0) if show: ax0.scatter(peaks, signal[peaks], c="r") peaks = np.asarray(peaks).astype(int) return peaks