# -*- coding: utf-8 -*-
import matplotlib.pyplot as plt
import numpy as np
import scipy.interpolate
from ..signal import signal_distort
[docs]def ppg_simulate(
duration=120,
sampling_rate=1000,
heart_rate=70,
frequency_modulation=0.3,
ibi_randomness=0.1,
drift=0,
motion_amplitude=0.1,
powerline_amplitude=0.01,
burst_number=0,
burst_amplitude=1,
random_state=None,
show=False,
):
"""Simulate a photoplethysmogram (PPG) signal.
Phenomenological approximation of PPG. The PPG wave is described with four landmarks: wave onset,
location of the systolic peak, location of the dicrotic notch and location of the diastolic peaks.
These landmarks are defined as x and y coordinates (in a time series). These coordinates are
then interpolated at the desired sampling rate to obtain the PPG signal.
Parameters
----------
duration : int
Desired recording length in seconds. The default is 120.
sampling_rate : int
The desired sampling rate (in Hz, i.e., samples/second). The default is 1000.
heart_rate : int
Desired simulated heart rate (in beats per minute). The default is 70.
frequency_modulation : float
Float between 0 and 1. Determines how pronounced respiratory sinus arrythmia (RSA) is
(0 corresponds to absence of RSA). The default is 0.3.
ibi_randomness : float
Float between 0 and 1. Determines how much random noise there is in the duration of each
PPG wave (0 corresponds to absence of variation). The default is 0.1.
drift : float
Float between 0 and 1. Determines how pronounced the baseline drift (.05 Hz) is
(0 corresponds to absence of baseline drift). The default is 1.
motion_amplitude : float
Float between 0 and 1. Determines how pronounced the motion artifact (0.5 Hz) is
(0 corresponds to absence of motion artifact). The default is 0.1.
powerline_amplitude : float
Float between 0 and 1. Determines how pronounced the powerline artifact (50 Hz) is
(0 corresponds to absence of powerline artifact). Note that powerline_amplitude > 0 is only
possible if 'sampling_rate' is >= 500. The default is 0.1.
burst_amplitude : float
Float between 0 and 1. Determines how pronounced high frequency burst artifacts are
(0 corresponds to absence of bursts). The default is 1.
burst_number : int
Determines how many high frequency burst artifacts occur. The default is 0.
show : bool
If true, returns a plot of the landmarks and interpolated PPG. Useful for debugging.
random_state : int
Seed for the random number generator. Keep it fixed for reproducible results.
Returns
-------
ppg : array
A vector containing the PPG.
See Also
--------
ecg_simulate, rsp_simulate, eda_simulate, emg_simulate
Examples
--------
>>> import neurokit2 as nk
>>>
>>> ppg = nk.ppg_simulate(duration=40, sampling_rate=500, heart_rate=75, random_state=42)
"""
# At the requested sampling rate, how long is a period at the requested
# heart-rate and how often does that period fit into the requested
# duration?
period = 60 / heart_rate # in seconds
n_period = int(np.floor(duration / period))
periods = np.ones(n_period) * period
# Seconds at which waves begin.
x_onset = np.cumsum(periods)
x_onset -= x_onset[0] # make sure seconds start at zero
# Add respiratory sinus arrythmia (frequency modulation).
periods, x_onset = _frequency_modulation(
periods, x_onset, modulation_frequency=0.05, modulation_strength=frequency_modulation
)
# Randomly modulate duration of waves by subracting a random value between
# 0 and ibi_randomness% of the wave duration (see function definition).
x_onset = _random_x_offset(x_onset, np.diff(x_onset), ibi_randomness)
# Corresponding signal amplitudes.
y_onset = np.random.normal(0, 0.1, n_period)
# Seconds at which the systolic peaks occur within the waves.
x_sys = x_onset + np.random.normal(0.175, 0.01, n_period) * periods
# Corresponding signal amplitudes.
y_sys = y_onset + np.random.normal(1.5, 0.15, n_period)
# Seconds at which the dicrotic notches occur within the waves.
x_notch = x_onset + np.random.normal(0.4, 0.001, n_period) * periods
# Corresponding signal amplitudes (percentage of systolic peak height).
y_notch = y_sys * np.random.normal(0.49, 0.01, n_period)
# Seconds at which the diastolic peaks occur within the waves.
x_dia = x_onset + np.random.normal(0.45, 0.001, n_period) * periods
# Corresponding signal amplitudes (percentage of systolic peak height).
y_dia = y_sys * np.random.normal(0.51, 0.01, n_period)
x_all = np.concatenate((x_onset, x_sys, x_notch, x_dia))
x_all.sort(kind="mergesort")
x_all = np.rint(x_all * sampling_rate).astype(int) # convert seconds to samples
y_all = np.zeros(n_period * 4)
y_all[0::4] = y_onset
y_all[1::4] = y_sys
y_all[2::4] = y_notch
y_all[3::4] = y_dia
if show:
__, (ax0, ax1) = plt.subplots(nrows=2, ncols=1, sharex=True)
ax0.scatter(x_all, y_all, c="r")
# Interpolate a continuous signal between the landmarks (i.e., Cartesian
# coordinates).
f = scipy.interpolate.Akima1DInterpolator(x_all, y_all)
samples = np.arange(int(np.ceil(duration * sampling_rate)))
ppg = f(samples)
# Remove NAN (values outside interpolation range, i.e., after last sample).
ppg[np.isnan(ppg)] = np.nanmean(ppg)
if show:
ax0.plot(ppg)
# Add baseline drift.
if drift > 0:
drift_freq = 0.05
if drift_freq < (1 / duration) * 2:
drift_freq = (1 / duration) * 2
ppg = signal_distort(
ppg,
sampling_rate=sampling_rate,
noise_amplitude=drift,
noise_frequency=drift_freq,
random_state=random_state,
silent=True,
)
# Add motion artifacts.
if motion_amplitude > 0:
motion_freq = 0.5
ppg = signal_distort(
ppg,
sampling_rate=sampling_rate,
noise_amplitude=motion_amplitude,
noise_frequency=motion_freq,
random_state=random_state,
silent=True,
)
# Add high frequency bursts.
if burst_amplitude > 0:
ppg = signal_distort(
ppg,
sampling_rate=sampling_rate,
artifacts_amplitude=burst_amplitude,
artifacts_frequency=100,
artifacts_number=burst_number,
random_state=random_state,
silent=True,
)
# Add powerline noise.
if powerline_amplitude > 0:
ppg = signal_distort(
ppg,
sampling_rate=sampling_rate,
powerline_amplitude=powerline_amplitude,
powerline_frequency=50,
random_state=random_state,
silent=True,
)
if show:
ax1.plot(ppg)
return ppg
def _frequency_modulation(periods, seconds, modulation_frequency, modulation_strength):
"""modulator_frequency determines the frequency at which respiratory sinus arrhythmia occurs (in Hz).
modulator_strength must be between 0 and 1.
"""
modulation_mean = 1.1
# Enforce minimum inter-beat-interval of 300 milliseconds.
if (modulation_mean - modulation_strength) * periods[
0
] < 0.3: # elements in periods all have the same value at this point
print(
"Skipping frequency modulation, since the modulation_strength"
f" {modulation_strength} leads to physiologically implausible"
f" wave durations of {((modulation_mean - modulation_strength) * periods[0]) * 1000}"
f" milliseconds."
)
return periods, seconds
# Apply a very conservative Nyquist criterion.
nyquist = (1 / periods[0]) * 0.1
if modulation_frequency > nyquist:
print(f"Please choose a modulation frequency lower than {nyquist}.")
# Generate a sine with mean 1.1 and amplitude modulation_strength, that is,
# ranging from 1.1 - modulation_strength to 1.1 + modulation_strength. Note
# that the mean must be 1.1 rather than 1 in order to not produce periods
# of duration 0 (i.e., at minimum the duration period is scaled down to
# .1 * period instead of 0 * period).
modulator = modulation_strength * np.sin(2 * np.pi * modulation_frequency * seconds) + modulation_mean
periods_modulated = periods * modulator
seconds_modulated = np.cumsum(periods_modulated)
seconds_modulated -= seconds_modulated[0] # make sure seconds start at zero
return periods_modulated, seconds_modulated
def _random_x_offset(x, x_diff, offset_weight):
"""From each wave onset xi subtract offset_weight * (xi - xi-1) where xi-1 is
the wave onset preceding xi. offset_weight must be between 0 and 1.
"""
# Enforce minimum inter-beat-interval of 300 milliseconds.
min_x_diff = min(x_diff)
if (min_x_diff - (min_x_diff * offset_weight)) < 0.3:
print(
"Skipping random IBI modulation, since the offset_weight"
f" {offset_weight} leads to physiologically implausible wave"
f" durations of {(min_x_diff - (min_x_diff * offset_weight)) * 1000}"
f" milliseconds."
)
return x
offsets = []
for i in x_diff:
max_offset = offset_weight * i
offset = np.random.uniform(0, max_offset)
offsets.append(offset)
x[1:] -= offsets
return x
def _amplitude_modulation():
# TODO
pass