Source code for neurokit2.signal.signal_changepoints

import numpy as np

from ..events import events_plot
from ..misc import as_vector


[docs]def signal_changepoints(signal, change="meanvar", penalty=None, show=False): """Change Point Detection. Only the PELT method is implemented for now. Parameters ----------- signal : Union[list, np.array, pd.Series] Vector of values. change : str Can be one of "meanvar" (default), "mean" or "var". penalty : float The algorithm penalty. Default to ``np.log(len(signal))``. show : bool Defaults to False. Returns ------- Array Values indicating the samples at which the changepoints occur. Fig Figure of plot of signal with markers of changepoints. Examples -------- >>> import neurokit2 as nk >>> >>> signal = nk.emg_simulate(burst_number=3) >>> fig = nk.signal_changepoints(signal, change="var", show=True) >>> fig #doctest: +SKIP References ---------- - Killick, R., Fearnhead, P., & Eckley, I. A. (2012). Optimal detection of changepoints with a linear computational cost. Journal of the American Statistical Association, 107(500), 1590-1598. """ signal = as_vector(signal) changepoints = _signal_changepoints_pelt(signal, change=change, penalty=penalty) if show is True: events_plot(changepoints, signal) return changepoints
def _signal_changepoints_pelt(signal, change="meanvar", penalty=None): """PELT algorithm to find change points in a signal. Adapted from: https://github.com/ruipgil/changepy https://github.com/deepcharles/ruptures https://github.com/STOR-i/Changepoints.jl https://github.com/rkillick/changepoint/ """ # Initialize length = len(signal) if penalty is None: penalty = np.log(length) # pylint: disable=E1111 if change.lower() == "var": cost = _signal_changepoints_cost_var(signal) elif change.lower() == "mean": cost = _signal_changepoints_cost_mean(signal) else: cost = _signal_changepoints_cost_meanvar(signal) # Run algorithm F = np.zeros(length + 1) R = np.array([0], dtype=np.int) candidates = np.zeros(length + 1, dtype=np.int) F[0] = -penalty # pylint: disable=E1130 for tstar in range(2, length + 1): cpt_cands = R seg_costs = np.zeros(len(cpt_cands)) for i in range(0, len(cpt_cands)): # pylint: disable=C0200 seg_costs[i] = cost(cpt_cands[i], tstar) F_cost = F[cpt_cands] + seg_costs F[tstar] = np.min(F_cost) + penalty tau = np.argmin(F_cost) candidates[tstar] = cpt_cands[tau] ineq_prune = [val < F[tstar] for val in F_cost] R = [cpt_cands[j] for j, val in enumerate(ineq_prune) if val] R.append(tstar - 1) R = np.array(R, dtype=np.int) last = candidates[-1] changepoints = [last] while last > 0: last = candidates[last] changepoints.append(last) return np.sort(changepoints) # ============================================================================= # Cost functions # ============================================================================= def _signal_changepoints_cost_mean(signal): """Cost function for a normally distributed signal with a changing mean.""" i_variance_2 = 1 / (np.var(signal) ** 2) cmm = [0.0] cmm.extend(np.cumsum(signal)) cmm2 = [0.0] cmm2.extend(np.cumsum(np.abs(signal))) def cost(start, end): cmm2_diff = cmm2[end] - cmm2[start] cmm_diff = pow(cmm[end] - cmm[start], 2) i_diff = end - start diff = cmm2_diff - cmm_diff return (diff / i_diff) * i_variance_2 return cost def _signal_changepoints_cost_var(signal): """Cost function for a normally distributed signal with a changing variance.""" cumm = [0.0] cumm.extend(np.cumsum(np.power(np.abs(signal - np.mean(signal)), 2))) def cost(s, t): dist = float(t - s) diff = cumm[t] - cumm[s] return dist * np.log(diff / dist) return cost def _signal_changepoints_cost_meanvar(signal): """Cost function for a normally distributed signal with a changing mean and variance.""" signal = np.hstack(([0.0], np.array(signal))) cumm = np.cumsum(signal) cumm_sq = np.cumsum([val ** 2 for val in signal]) def cost(s, t): ts_i = 1.0 / (t - s) mu = (cumm[t] - cumm[s]) * ts_i sig = (cumm_sq[t] - cumm_sq[s]) * ts_i - mu ** 2 sig_i = 1.0 / sig return ( (t - s) * np.log(sig) + (cumm_sq[t] - cumm_sq[s]) * sig_i - 2 * (cumm[t] - cumm[s]) * mu * sig_i + ((t - s) * mu ** 2) * sig_i ) return cost