Source code for neurokit2.ecg.ecg_simulate

# -*- coding: utf-8 -*-
import math

import numpy as np
import scipy

from ..signal import signal_distort, signal_resample


[docs]def ecg_simulate( duration=10, length=None, sampling_rate=1000, noise=0.01, heart_rate=70, method="ecgsyn", random_state=None ): """Simulate an ECG/EKG signal. Generate an artificial (synthetic) ECG signal of a given duration and sampling rate using either the ECGSYN dynamical model (McSharry et al., 2003) or a simpler model based on Daubechies wavelets to roughly approximate cardiac cycles. Parameters ---------- duration : int Desired recording length in seconds. sampling_rate : int The desired sampling rate (in Hz, i.e., samples/second). length : int The desired length of the signal (in samples). noise : float Noise level (amplitude of the laplace noise). heart_rate : int Desired simulated heart rate (in beats per minute). method : str The model used to generate the signal. Can be 'simple' for a simulation based on Daubechies wavelets that roughly approximates a single cardiac cycle. If 'ecgsyn' (default), will use an advanced model desbribed `McSharry et al. (2003) <https://physionet.org/content/ecgsyn/>`_. random_state : int Seed for the random number generator. Returns ------- array Vector containing the ECG signal. Examples ---------- >>> import pandas as pd >>> import neurokit2 as nk >>> >>> ecg1 = nk.ecg_simulate(duration=10, method="simple") >>> ecg2 = nk.ecg_simulate(duration=10, method="ecgsyn") >>> pd.DataFrame({"ECG_Simple": ecg1, ... "ECG_Complex": ecg2}).plot(subplots=True) #doctest: +ELLIPSIS array([<matplotlib.axes._subplots.AxesSubplot object at ...>, <matplotlib.axes._subplots.AxesSubplot object at ...>], dtype=object) See Also -------- rsp_simulate, eda_simulate, ppg_simulate, emg_simulate References ----------- - McSharry, P. E., Clifford, G. D., Tarassenko, L., & Smith, L. A. (2003). A dynamical model for generating synthetic electrocardiogram signals. IEEE transactions on biomedical engineering, 50(3), 289-294. - https://github.com/diarmaidocualain/ecg_simulation """ # Seed the random generator for reproducible results np.random.seed(random_state) # Generate number of samples automatically if length is unspecified if length is None: length = duration * sampling_rate if duration is None: duration = length / sampling_rate # Run appropriate method if method.lower() in ["simple", "daubechies"]: ecg = _ecg_simulate_daubechies( duration=duration, length=length, sampling_rate=sampling_rate, heart_rate=heart_rate ) else: approx_number_beats = int(np.round(duration * (heart_rate / 60))) ecg = _ecg_simulate_ecgsyn( sfecg=sampling_rate, N=approx_number_beats, Anoise=0, hrmean=heart_rate, hrstd=1, lfhfratio=0.5, sfint=sampling_rate, ti=(-70, -15, 0, 15, 100), ai=(1.2, -5, 30, -7.5, 0.75), bi=(0.25, 0.1, 0.1, 0.1, 0.4), ) # Cut to match expected length ecg = ecg[0:length] # Add random noise if noise > 0: ecg = signal_distort( ecg, sampling_rate=sampling_rate, noise_amplitude=noise, noise_frequency=[5, 10, 100], noise_shape="laplace", random_state=random_state, silent=True, ) # Reset random seed (so it doesn't affect global) np.random.seed(None) return ecg
# ============================================================================= # Daubechies # ============================================================================= def _ecg_simulate_daubechies(duration=10, length=None, sampling_rate=1000, heart_rate=70): """Generate an artificial (synthetic) ECG signal of a given duration and sampling rate. It uses a 'Daubechies' wavelet that roughly approximates a single cardiac cycle. This function is based on `this script <https://github.com/diarmaidocualain/ecg_simulation>`_. """ # The "Daubechies" wavelet is a rough approximation to a real, single, cardiac cycle cardiac = scipy.signal.wavelets.daub(10) # Add the gap after the pqrst when the heart is resting. cardiac = np.concatenate([cardiac, np.zeros(10)]) # Caculate the number of beats in capture time period num_heart_beats = int(duration * heart_rate / 60) # Concatenate together the number of heart beats needed ecg = np.tile(cardiac, num_heart_beats) # Change amplitude ecg = ecg * 10 # Resample ecg = signal_resample( ecg, sampling_rate=int(len(ecg) / 10), desired_length=length, desired_sampling_rate=sampling_rate ) return ecg # ============================================================================= # ECGSYN # ============================================================================= def _ecg_simulate_ecgsyn( sfecg=256, N=256, Anoise=0, hrmean=60, hrstd=1, lfhfratio=0.5, sfint=512, ti=(-70, -15, 0, 15, 100), ai=(1.2, -5, 30, -7.5, 0.75), bi=(0.25, 0.1, 0.1, 0.1, 0.4), ): """This function is a python translation of the matlab script by `McSharry & Clifford (2013) <https://physionet.org/content/ecgsyn>`_. Parameters ---------- % Operation uses the following parameters (default values in []s): % sfecg: ECG sampling frequency [256 Hertz] % N: approximate number of heart beats [256] % Anoise: Additive uniformly distributed measurement noise [0 mV] % hrmean: Mean heart rate [60 beats per minute] % hrstd: Standard deviation of heart rate [1 beat per minute] % lfhfratio: LF/HF ratio [0.5] % sfint: Internal sampling frequency [256 Hertz] % Order of extrema: (P Q R S T) % ti = angles of extrema (in degrees) % ai = z-position of extrema % bi = Gaussian width of peaks Returns ------- array Vector containing simulated ecg signal. # Examples # -------- # >>> import matplotlib.pyplot as plt # >>> import neurokit2 as nk # >>> # >>> s = _ecg_simulate_ecgsynth() # >>> x = np.linspace(0, len(s)-1, len(s)) # >>> num_points = 4000 # >>> # >>> num_points = min(num_points, len(s)) # >>> plt.plot(x[:num_points], s[:num_points]) #doctest: +SKIP # >>> plt.show() #doctest: +SKIP """ if not isinstance(ti, np.ndarray): ti = np.array(ti) if not isinstance(ai, np.ndarray): ai = np.array(ai) if not isinstance(bi, np.ndarray): bi = np.array(bi) ti = ti * np.pi / 180 # Adjust extrema parameters for mean heart rate hrfact = np.sqrt(hrmean / 60) hrfact2 = np.sqrt(hrfact) bi = hrfact * bi ti = np.array([hrfact2, hrfact, 1, hrfact, hrfact2]) * ti # Check that sfint is an integer multiple of sfecg q = np.round(sfint / sfecg) qd = sfint / sfecg if q != qd: raise ValueError( "Internal sampling frequency (sfint) must be an integer multiple of the ECG sampling frequency" " (sfecg). Your current choices are: sfecg = " + str(sfecg) + " and sfint = " + str(sfint) + "." ) # Define frequency parameters for rr process # flo and fhi correspond to the Mayer waves and respiratory rate respectively flo = 0.1 fhi = 0.25 flostd = 0.01 fhistd = 0.01 # Calculate time scales for rr and total output sfrr = 1 trr = 1 / sfrr rrmean = 60 / hrmean n = 2 ** (np.ceil(np.log2(N * rrmean / trr))) rr0 = _ecg_simulate_rrprocess(flo, fhi, flostd, fhistd, lfhfratio, hrmean, hrstd, sfrr, n) # Upsample rr time series from 1 Hz to sfint Hz rr = signal_resample(rr0, sampling_rate=1, desired_sampling_rate=sfint) # Make the rrn time series dt = 1 / sfint rrn = np.zeros(len(rr)) tecg = 0 i = 0 while i < len(rr): tecg += rr[i] ip = int(np.round(tecg / dt)) rrn[i:ip] = rr[i] i = ip Nt = ip # Integrate system using fourth order Runge-Kutta x0 = np.array([1, 0, 0.04]) # tspan is a tuple of (min, max) which defines the lower and upper bound of t in ODE # t_eval is the list of desired t points for ODE # in Matlab, ode45 can accepts both tspan and t_eval in one argument Tspan = [0, (Nt - 1) * dt] t_eval = np.linspace(0, (Nt - 1) * dt, Nt) # as passing extra arguments to derivative function is not supported yet in solve_ivp # lambda function is used to serve the purpose result = scipy.integrate.solve_ivp( lambda t, x: _ecg_simulate_derivsecgsyn(t, x, rrn, ti, sfint, ai, bi), Tspan, x0, t_eval=t_eval ) X0 = result.y # downsample to required sfecg X = X0[:, np.arange(0, X0.shape[1], q).astype(int)] # Scale signal to lie between -0.4 and 1.2 mV z = X[2, :].copy() zmin = np.min(z) zmax = np.max(z) zrange = zmax - zmin z = (z - zmin) * 1.6 / zrange - 0.4 # include additive uniformly distributed measurement noise eta = 2 * np.random.uniform(len(z)) - 1 return z + Anoise * eta # Return signal def _ecg_simulate_derivsecgsyn(t, x, rr, ti, sfint, ai, bi): ta = math.atan2(x[1], x[0]) r0 = 1 a0 = 1.0 - np.sqrt(x[0] ** 2 + x[1] ** 2) / r0 ip = np.floor(t * sfint).astype(int) w0 = 2 * np.pi / rr[min(ip, len(rr) - 1)] # w0 = 2*np.pi/rr[ip[ip <= np.max(rr)]] fresp = 0.25 zbase = 0.005 * np.sin(2 * np.pi * fresp * t) dx1dt = a0 * x[0] - w0 * x[1] dx2dt = a0 * x[1] + w0 * x[0] # matlab rem and numpy rem are different # dti = np.remainder(ta - ti, 2*np.pi) dti = (ta - ti) - np.round((ta - ti) / 2 / np.pi) * 2 * np.pi dx3dt = -np.sum(ai * dti * np.exp(-0.5 * (dti / bi) ** 2)) - 1 * (x[2] - zbase) dxdt = np.array([dx1dt, dx2dt, dx3dt]) return dxdt def _ecg_simulate_rrprocess( flo=0.1, fhi=0.25, flostd=0.01, fhistd=0.01, lfhfratio=0.5, hrmean=60, hrstd=1, sfrr=1, n=256 ): w1 = 2 * np.pi * flo w2 = 2 * np.pi * fhi c1 = 2 * np.pi * flostd c2 = 2 * np.pi * fhistd sig2 = 1 sig1 = lfhfratio rrmean = 60 / hrmean rrstd = 60 * hrstd / (hrmean * hrmean) df = sfrr / n w = np.arange(n) * 2 * np.pi * df dw1 = w - w1 dw2 = w - w2 Hw1 = sig1 * np.exp(-0.5 * (dw1 / c1) ** 2) / np.sqrt(2 * np.pi * c1 ** 2) Hw2 = sig2 * np.exp(-0.5 * (dw2 / c2) ** 2) / np.sqrt(2 * np.pi * c2 ** 2) Hw = Hw1 + Hw2 Hw0 = np.concatenate((Hw[0 : int(n / 2)], Hw[int(n / 2) - 1 :: -1])) Sw = (sfrr / 2) * np.sqrt(Hw0) ph0 = 2 * np.pi * np.random.uniform(size=int(n / 2 - 1)) ph = np.concatenate([[0], ph0, [0], -np.flipud(ph0)]) SwC = Sw * np.exp(1j * ph) x = (1 / n) * np.real(np.fft.ifft(SwC)) xstd = np.std(x) ratio = rrstd / xstd return rrmean + x * ratio # Return RR