# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from ..ecg.ecg_analyze import ecg_analyze
from ..ecg.ecg_rsa import ecg_rsa
from ..eda.eda_analyze import eda_analyze
from ..emg.emg_analyze import emg_analyze
from ..rsp.rsp_analyze import rsp_analyze
[docs]def bio_analyze(data, sampling_rate=1000, method="auto"):
"""Automated analysis of bio signals.
Wrapper for other bio analyze functions of
electrocardiography signals (ECG), respiration signals (RSP),
electrodermal activity (EDA) and electromyography signals (EMG).
Parameters
----------
data : DataFrame
The DataFrame containing all the processed signals, typically
produced by `bio_process()`, `ecg_process()`, `rsp_process()`,
`eda_process()`, or `emg_process()`.
sampling_rate : int
The sampling frequency of the signals (in Hz, i.e., samples/second).
Defaults to 1000.
method : str
Can be one of 'event-related' for event-related analysis on epochs,
or 'interval-related' for analysis on longer periods of data. Defaults
to 'auto' where the right method will be chosen based on the
mean duration of the data ('event-related' for duration under 10s).
Returns
----------
DataFrame
DataFrame of the analyzed bio features. See docstrings of `ecg_analyze()`,
`rsp_analyze()`, `eda_analyze()` and `emg_analyze()` for more details.
Also returns Respiratory Sinus Arrhythmia features produced by
`ecg_rsa()` if interval-related analysis is carried out.
See Also
----------
ecg_analyze, rsp_analyze, eda_analyze, emg_analyze
Examples
----------
>>> import neurokit2 as nk
>>>
>>> # Example 1: Event-related analysis
>>> # Download data
>>> data = nk.data("bio_eventrelated_100hz")
>>>
>>> # Process the data
>>> df, info = nk.bio_process(ecg=data["ECG"], rsp=data["RSP"], eda=data["EDA"],
... keep=data["Photosensor"], sampling_rate=100)
>>>
>>> # Build epochs
>>> events = nk.events_find(data["Photosensor"], threshold_keep='below',
... event_conditions=["Negative", "Neutral",
... "Neutral", "Negative"])
>>> epochs = nk.epochs_create(df, events, sampling_rate=100, epochs_start=-0.1,
... epochs_end=1.9)
>>>
>>> # Analyze
>>> nk.bio_analyze(epochs, sampling_rate=100) #doctest: +SKIP
>>>
>>> # Example 2: Interval-related analysis
>>> # Download data
>>> data = nk.data("bio_resting_5min_100hz")
>>>
>>> # Process the data
>>> df, info = nk.bio_process(ecg=data["ECG"], rsp=data["RSP"], sampling_rate=100)
>>>
>>> # Analyze
>>> nk.bio_analyze(df, sampling_rate=100) #doctest: +SKIP
"""
features = pd.DataFrame()
method = method.lower()
# Sanitize input
if isinstance(data, pd.DataFrame):
ecg_cols = [col for col in data.columns if "ECG" in col]
rsp_cols = [col for col in data.columns if "RSP" in col]
eda_cols = [col for col in data.columns if "EDA" in col]
emg_cols = [col for col in data.columns if "EMG" in col]
ecg_rate_col = [col for col in data.columns if "ECG_Rate" in col]
rsp_phase_col = [col for col in data.columns if "RSP_Phase" in col]
elif isinstance(data, dict):
for i in data:
ecg_cols = [col for col in data[i].columns if "ECG" in col]
rsp_cols = [col for col in data[i].columns if "RSP" in col]
eda_cols = [col for col in data[i].columns if "EDA" in col]
emg_cols = [col for col in data[i].columns if "EMG" in col]
ecg_rate_col = [col for col in data[i].columns if "ECG_Rate" in col]
rsp_phase_col = [col for col in data[i].columns if "RSP_Phase" in col]
else:
raise ValueError(
"NeuroKit error: bio_analyze(): Wrong input, please make sure you enter a DataFrame or a dictionary. "
)
# ECG
ecg_data = data.copy()
if len(ecg_cols) != 0:
ecg_analyzed = ecg_analyze(ecg_data, sampling_rate=sampling_rate, method=method)
features = pd.concat([features, ecg_analyzed], axis=1, sort=False)
# RSP
rsp_data = data.copy()
if len(rsp_cols) != 0:
rsp_analyzed = rsp_analyze(rsp_data, sampling_rate=sampling_rate, method=method)
features = pd.concat([features, rsp_analyzed], axis=1, sort=False)
# EDA
if len(eda_cols) != 0:
eda_analyzed = eda_analyze(data, sampling_rate=sampling_rate, method=method)
features = pd.concat([features, eda_analyzed], axis=1, sort=False)
# EMG
if len(emg_cols) != 0:
emg_analyzed = emg_analyze(data, sampling_rate=sampling_rate, method=method)
features = pd.concat([features, emg_analyzed], axis=1, sort=False)
# RSA
if len(ecg_rate_col + rsp_phase_col) >= 3:
# Event-related
if method in ["event-related", "event", "epoch"]:
rsa = _bio_analyze_rsa_event(data)
# Interval-related
elif method in ["interval-related", "interval", "resting-state"]:
rsa = _bio_analyze_rsa_interval(data, sampling_rate=sampling_rate)
# Auto
else:
duration = _bio_analyze_findduration(data, sampling_rate=sampling_rate)
if duration >= 10:
rsa = _bio_analyze_rsa_interval(data, sampling_rate=sampling_rate)
else:
rsa = _bio_analyze_rsa_event(data)
features = pd.concat([features, rsa], axis=1, sort=False)
# Remove duplicate columns of Label and Condition
if "Label" in features.columns.values:
features = features.loc[:, ~features.columns.duplicated()]
return features
# =============================================================================
# Internals
# =============================================================================
def _bio_analyze_findduration(data, sampling_rate=1000):
# If DataFrame
if isinstance(data, pd.DataFrame):
if "Label" in data.columns:
labels = data["Label"].unique()
durations = [len(data[data["Label"] == label]) / sampling_rate for label in labels]
else:
durations = [len(data) / sampling_rate]
# If dictionary
if isinstance(data, dict):
durations = [len(data[i]) / sampling_rate for i in data]
return np.nanmean(durations)
def _bio_analyze_rsa_interval(data, sampling_rate=1000):
# RSA features for interval-related analysis
if isinstance(data, pd.DataFrame):
rsa = ecg_rsa(data, sampling_rate=sampling_rate, continuous=False)
rsa = pd.DataFrame.from_dict(rsa, orient="index").T
if isinstance(data, dict):
rsa = {}
for index in data:
rsa[index] = {} # Initialize empty container
data[index] = data[index].set_index("Index").drop(["Label"], axis=1)
rsa[index] = ecg_rsa(data[index], sampling_rate=sampling_rate)
rsa = pd.DataFrame.from_dict(rsa, orient="index")
return rsa
def _bio_analyze_rsa_event(data, rsa={}):
# RSA features for event-related analysis
if isinstance(data, dict):
for i in data:
rsa[i] = {}
rsa[i] = _bio_analyze_rsa_epoch(data[i], rsa[i])
rsa = pd.DataFrame.from_dict(rsa, orient="index")
if isinstance(data, pd.DataFrame):
rsa = data.groupby("Label")["RSA_P2T"].mean()
# TODO Needs further fixing
return rsa
def _bio_analyze_rsa_epoch(epoch, output={}):
# RSA features for event-related analysis: epoching
if np.min(epoch.index.values) <= 0:
baseline = epoch["RSA_P2T"][epoch.index <= 0].values
signal = epoch["RSA_P2T"][epoch.index > 0].values
output["RSA_P2T"] = np.mean(signal) - np.mean(baseline)
else:
signal = epoch["RSA_P2T"].values
output["RSA_P2T"] = np.mean(signal)
return output