123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601 |
- # -*- coding: utf-8 -*-
- """
- .. current_module elephant.causality
- Overview
- --------
- This module provides function to estimate causal influences of signals on each
- other.
- Granger causality
- ~~~~~~~~~~~~~~~~~
- Granger causality is a method to determine causal influence of one signal on
- another based on autoregressive modelling. It was developed by Nobel prize
- laureate Clive Granger and has been adopted in various numerical fields ever
- since :cite:`granger-Granger69_424`. In its simplest form, the
- method tests whether the past values of one signal help to reduce the
- prediction error of another signal, compared to the past values of the latter
- signal alone. If it does reduce the prediction error, the first signal is said
- to Granger cause the other signal.
- Limitations
- +++++++++++
- The user must be mindful of the method's limitations, which are assumptions of
- covariance stationary data, linearity imposed by the underlying autoregressive
- modelling as well as the fact that the variables not included in the model will
- not be accounted for :cite:`granger-Seth07_1667`.
- Implementation
- ++++++++++++++
- The mathematical implementation of Granger causality methods in this module
- closely follows :cite:`granger-Ding06_0608035`.
- Overview of Functions
- ---------------------
- Various formulations of Granger causality have been developed. In this module
- you will find function for time-series data to test pairwise Granger causality
- (`pairwise_granger`).
- Time-series Granger causality
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- .. autosummary::
- :toctree: toctree/causality/
- pairwise_granger
- conditional_granger
- References
- ----------
- .. bibliography:: ../bib/elephant.bib
- :labelprefix: gr
- :keyprefix: granger-
- :style: unsrt
- :copyright: Copyright 2014-2020 by the Elephant team, see `doc/authors.rst`.
- :license: Modified BSD, see LICENSE.txt for details.
- """
- from __future__ import division, print_function, unicode_literals
- import warnings
- from collections import namedtuple
- import numpy as np
- from neo.core import AnalogSignal
- __all__ = (
- "Causality",
- "pairwise_granger",
- "conditional_granger"
- )
- # the return type of pairwise_granger() function
- Causality = namedtuple('Causality',
- ['directional_causality_x_y',
- 'directional_causality_y_x',
- 'instantaneous_causality',
- 'total_interdependence'])
- def _bic(cov, order, dimension, length):
- """
- Calculate Bayesian Information Criterion
- Parameters
- ----------
- cov : np.ndarray
- covariance matrix of auto regressive model
- order : int
- order of autoregressive model
- dimension : int
- dimensionality of the data
- length : int
- number of time samples
- Returns
- -------
- criterion : float
- Bayesian Information Criterion
- """
- sign, log_det_cov = np.linalg.slogdet(cov)
- criterion = 2 * log_det_cov \
- + 2*(dimension**2)*order*np.log(length)/length
- return criterion
- def _aic(cov, order, dimension, length):
- """
- Calculate Akaike Information Criterion
- Parameters
- ----------
- cov : np.ndarray
- covariance matrix of auto regressive model
- order : int
- order of autoregressive model
- dimension : int
- dimensionality of the data
- length : int
- number of time samples
- Returns
- -------
- criterion : float
- Akaike Information Criterion
- """
- sign, log_det_cov = np.linalg.slogdet(cov)
- criterion = 2 * log_det_cov \
- + 2*(dimension**2)*order/length
- return criterion
- def _lag_covariances(signals, dimension, max_lag):
- r"""
- Determine covariances of time series and time shift of itself up to a
- maximal lag
- Parameters
- ----------
- signals: np.ndarray
- time series data
- dimension : int
- number of time series
- max_lag: int
- maximal time lag to be considered
- Returns
- -------
- lag_corr : np.ndarray
- correlations matrices of lagged signals
- Covariance of shifted signals calculated according to the following
- formula:
- x: d-dimensional signal
- x^T: transpose of d-dimensional signal
- N: number of time points
- \tau: lag
- C(\tau) = \sum_{i=0}^{N-\tau} x[i]*x^T[\tau+i]
- """
- length = np.size(signals[0])
- if length < max_lag:
- raise ValueError("Maximum lag larger than size of data")
- # centralize time series
- signals_mean = (signals - np.mean(signals, keepdims=True)).T
- lag_covariances = np.zeros((max_lag+1, dimension, dimension))
- # determine lagged covariance for different time lags
- for lag in range(0, max_lag+1):
- lag_covariances[lag] = \
- np.mean(np.einsum('ij,ik -> ijk', signals_mean[:length-lag],
- signals_mean[lag:]), axis=0)
- return lag_covariances
- def _yule_walker_matrix(data, dimension, order):
- r"""
- Generate matrix for Yule-Walker equation
- Parameters
- ----------
- data : np.ndarray
- correlation of data shifted with lags up to order
- dimension : int
- dimensionality of data (e.g. number of channels)
- order : int
- order of the autoregressive model
- Returns
- -------
- yule_walker_matrix : np.ndarray
- matrix in Yule-Walker equation
- Yule-Walker Matrix M is a block-structured symmetric matrix with
- dimension (d \cdot p)\times(d \cdot p)
- where
- d: dimension of signal
- p: order of autoregressive model
- C(\tau): time-shifted covariances \tau -> d \times d matrix
- The blocks of size (d \times d) are set as follows:
- M_ij = C(j-i)^T
- where 1 \leq i \leq j \leq p. The other entries are determined by
- symmetry.
- lag_covariances : np.ndarray
- """
- lag_covariances = _lag_covariances(data, dimension, order)
- yule_walker_matrix = np.zeros((dimension*order, dimension*order))
- for block_row in range(order):
- for block_column in range(block_row, order):
- yule_walker_matrix[block_row*dimension: (block_row+1)*dimension,
- block_column*dimension:
- (block_column+1)*dimension] = \
- lag_covariances[block_column-block_row].T
- yule_walker_matrix[block_column*dimension:
- (block_column+1)*dimension,
- block_row*dimension:
- (block_row+1)*dimension] = \
- lag_covariances[block_column-block_row]
- return yule_walker_matrix, lag_covariances
- def _vector_arm(signals, dimension, order):
- r"""
- Determine coefficients of autoregressive model from time series data.
- Coefficients of autoregressive model calculated via solving the linear
- equation
- M A = C
- where
- M: Yule-Waler Matrix
- A: Coefficients of autoregressive model
- C: Time-shifted covariances with positive lags
- Covariance matrix C_0 is then given by
- C_0 = C[0] - \sum_{i=0}^{p-1} A[i]C[i+1]
- where p is the orde of the autoregressive model.
- Parameters
- ----------
- signals : np.ndarray
- time series data
- order : int
- order of the autoregressive model
- Returns
- -------
- coeffs: np.ndarray
- coefficients of the autoregressive model
- ry
- covar_mat : np.ndarray
- covariance matrix of
- """
- yule_walker_matrix, lag_covariances = \
- _yule_walker_matrix(signals, dimension, order)
- positive_lag_covariances = np.reshape(lag_covariances[1:],
- (dimension*order, dimension))
- lstsq_coeffs = \
- np.linalg.lstsq(yule_walker_matrix, positive_lag_covariances)[0]
- coeffs = []
- for index in range(order):
- coeffs.append(lstsq_coeffs[index*dimension:(index+1)*dimension, ].T)
- coeffs = np.stack(coeffs)
- cov_matrix = np.copy(lag_covariances[0])
- for i in range(order):
- cov_matrix -= np.matmul(coeffs[i], lag_covariances[i+1])
- return coeffs, cov_matrix
- def _optimal_vector_arm(signals, dimension, max_order,
- information_criterion='aic'):
- """
- Determine optimal auto regressive model by choosing optimal order via
- Information Criterion
- Parameters
- ----------
- signals : np.ndarray
- time series data
- dimension : int
- dimensionality of the data
- max_order : int
- maximal order to consider
- information_criterion : str
- A function to compute the information criterion:
- `bic` for Bayesian information_criterion,
- `aic` for Akaike information criterion
- Default: aic
- Returns
- -------
- optimal_coeffs: np.ndarray
- coefficients of the autoregressive model
- optimal_cov_mat : np.ndarray
- covariance matrix of
- optimal_order : int
- optimal order
- """
- length = np.size(signals[0])
- optimal_ic = np.infty
- optimal_order = 1
- optimal_coeffs = np.zeros((dimension, dimension, optimal_order))
- optimal_cov_matrix = np.zeros((dimension, dimension))
- for order in range(1, max_order + 1):
- coeffs, cov_matrix = _vector_arm(signals, dimension, order)
- if information_criterion == 'aic':
- temp_ic = _aic(cov_matrix, order, dimension, length)
- elif information_criterion == 'bic':
- temp_ic = _bic(cov_matrix, order, dimension, length)
- else:
- raise ValueError("The specified information criterion is not"
- "available. Please use 'aic' or 'bic'.")
- if temp_ic < optimal_ic:
- optimal_ic = temp_ic
- optimal_order = order
- optimal_coeffs = coeffs
- optimal_cov_matrix = cov_matrix
- return optimal_coeffs, optimal_cov_matrix, optimal_order
- def pairwise_granger(signals, max_order, information_criterion='aic'):
- r"""
- Determine Granger Causality of two time series
- Parameters
- ----------
- signals : (N, 2) np.ndarray or neo.AnalogSignal
- A matrix with two time series (second dimension) that have N time
- points (first dimension).
- max_order : int
- Maximal order of autoregressive model.
- information_criterion : {'aic', 'bic'}, optional
- A function to compute the information criterion:
- `bic` for Bayesian information_criterion,
- `aic` for Akaike information criterion,
- Default: 'aic'.
- Returns
- -------
- Causality
- A `namedtuple` with the following attributes:
- directional_causality_x_y : float
- The Granger causality value for X influence onto Y.
- directional_causality_y_x : float
- The Granger causality value for Y influence onto X.
- instantaneous_causality : float
- The remaining channel interdependence not accounted for by
- the directional causalities (e.g. shared input to X and Y).
- total_interdependence : float
- The sum of the former three metrics. It measures the dependence
- of X and Y. If the total interdependence is positive, X and Y
- are not independent.
- Denote covariance matrix of signals
- X by C|X - a real number
- Y by C|Y - a real number
- (X,Y) by C|XY - a (2 \times 2) matrix
- directional causality X -> Y given by
- log(C|X / C|XY_00)
- directional causality Y -> X given by
- log(C|Y / C|XY_11)
- instantaneous causality of X,Y given by
- log(C|XY_00 / C|XY_11)
- total interdependence of X,Y given by
- log( {C|X \cdot C|Y} / det{C|XY} )
- Raises
- ------
- ValueError
- If the provided signal does not have a shape of Nx2.
- If the determinant of the prediction error covariance matrix is not
- positive.
- Warns
- -----
- UserWarning
- If the log determinant of the prediction error covariance matrix is
- below the tolerance level of 1e-7.
- Notes
- -----
- The formulas used in this implementation follows
- :cite:`granger-Ding06_0608035`. The only difference being that we change
- the equation 47 in the following way:
- -R(k) - A(1)R(k - 1) - ... - A(m)R(k - m) = 0.
- This forumlation allows for the usage of R values without transposition
- (i.e. directly) in equation 48.
- Examples
- --------
- Example 1. Independent variables.
- >>> import numpy as np
- >>> from elephant.causality.granger import pairwise_granger
- >>> pairwise_granger(np.random.uniform(size=(1000, 2)), max_order=2)
- Causality(directional_causality_x_y=0.0,
- directional_causality_y_x=-0.0,
- instantaneous_causality=0.0,
- total_interdependence=0.0)
- Example 2. Dependent variables. Y depends on X but not vice versa.
- .. math::
- \begin{array}{ll}
- X_t \sim \mathcal{N}(0, 1) \\
- Y_t = 3.5 \cdot X_{t-1} + \epsilon, \;
- \epsilon \sim\mathcal{N}(0, 1)
- \end{array}
- In this case, the directional causality is non-zero.
- >>> x = np.random.randn(1001)
- >>> y = 3.5 * x[:-1] + np.random.randn(1000)
- >>> signals = np.array([x[1:], y]).T # N x 2 matrix
- >>> pairwise_granger(signals, max_order=1)
- Causality(directional_causality_x_y=2.64,
- directional_causality_y_x=0.0,
- instantaneous_causality=0.0,
- total_interdependence=2.64)
- """
- if isinstance(signals, AnalogSignal):
- signals = signals.magnitude
- if not (signals.ndim == 2 and signals.shape[1] == 2):
- raise ValueError("The input 'signals' must be of dimensions Nx2.")
- # transpose (N,2) -> (2,N) for mathematical convenience
- signals = signals.T
- # signal_x and signal_y are (1, N) arrays
- signal_x, signal_y = np.expand_dims(signals, axis=1)
- coeffs_x, var_x, p_1 = _optimal_vector_arm(signal_x, 1, max_order,
- information_criterion)
- coeffs_y, var_y, p_2 = _optimal_vector_arm(signal_y, 1, max_order,
- information_criterion)
- coeffs_xy, cov_xy, p_3 = _optimal_vector_arm(signals, 2, max_order,
- information_criterion)
- sign, log_det_cov = np.linalg.slogdet(cov_xy)
- tolerance = 1e-7
- if sign <= 0:
- raise ValueError(
- "Determinant of covariance matrix must be always positive: "
- "In this case its sign is {}".format(sign))
- if log_det_cov <= tolerance:
- warnings.warn("The value of the log determinant is at or below the "
- "tolerance level. Proceeding with computation.",
- UserWarning)
- directional_causality_y_x = np.log(var_x[0]) - np.log(cov_xy[0, 0])
- directional_causality_x_y = np.log(var_y[0]) - np.log(cov_xy[1, 1])
- instantaneous_causality = \
- np.log(cov_xy[0, 0]) + np.log(cov_xy[1, 1]) - log_det_cov
- instantaneous_causality = np.asarray(instantaneous_causality)
- total_interdependence = np.log(var_x[0]) + np.log(var_y[0]) - log_det_cov
- # Round GC according to following scheme:
- # Note that standard error scales as 1/sqrt(sample_size)
- # Calculate significant figures according to standard error
- length = np.size(signal_x)
- asymptotic_std_error = 1/np.sqrt(length)
- est_sig_figures = int((-1)*np.around(np.log10(asymptotic_std_error)))
- directional_causality_x_y_round = np.around(directional_causality_x_y,
- est_sig_figures)
- directional_causality_y_x_round = np.around(directional_causality_y_x,
- est_sig_figures)
- instantaneous_causality_round = np.around(instantaneous_causality,
- est_sig_figures)
- total_interdependence_round = np.around(total_interdependence,
- est_sig_figures)
- return Causality(
- directional_causality_x_y=directional_causality_x_y_round.item(),
- directional_causality_y_x=directional_causality_y_x_round.item(),
- instantaneous_causality=instantaneous_causality_round.item(),
- total_interdependence=total_interdependence_round.item())
- def conditional_granger(signals, max_order, information_criterion='aic'):
- r"""
- Determine conditional Granger Causality of the second time series on the
- first time series, given the third time series. In other words, for time
- series X_t, Y_t and Z_t, this function tests if Y_t influences X_t via Z_t.
- Parameters
- ----------
- signals : (N, 3) np.ndarray or neo.AnalogSignal
- A matrix with three time series (second dimension) that have N time
- points (first dimension). The time series to be conditioned on is the
- third.
- max_order : int
- Maximal order of autoregressive model.
- information_criterion : {'aic', 'bic'}, optional
- A function to compute the information criterion:
- `bic` for Bayesian information_criterion,
- `aic` for Akaike information criterion,
- Default: 'aic'.
- Returns
- -------
- conditional_causality_xy_z_round : float
- The value of conditional causality of Y_t on X_t given Z_t. Zero value
- indicates that causality of Y_t on X_t is solely dependent on Z_t.
- Raises
- ------
- ValueError
- If the provided signal does not have a shape of Nx3.
- Notes
- -----
- The formulas used in this implementation follows
- :cite:`granger-Ding06_0608035`. Specifically, the Eq 35.
- """
- if isinstance(signals, AnalogSignal):
- signals = signals.magnitude
- if not (signals.ndim == 2 and signals.shape[1] == 3):
- raise ValueError("The input 'signals' must be of dimensions Nx3.")
- # transpose (N,3) -> (3,N) for mathematical convenience
- signals = signals.T
- # signal_x, signal_y and signal_z are (1, N) arrays
- signal_x, signal_y, signal_z = np.expand_dims(signals, axis=1)
- signals_xz = np.vstack([signal_x, signal_z])
- coeffs_xz, cov_xz, p_1 = _optimal_vector_arm(
- signals_xz, dimension=2, max_order=max_order,
- information_criterion=information_criterion)
- coeffs_xyz, cov_xyz, p_2 = _optimal_vector_arm(
- signals, dimension=3, max_order=max_order,
- information_criterion=information_criterion)
- conditional_causality_xy_z = np.log(cov_xz[0, 0]) - np.log(cov_xyz[0, 0])
- # Round conditional GC according to following scheme:
- # Note that standard error scales as 1/sqrt(sample_size)
- # Calculate significant figures according to standard error
- length = np.size(signal_x)
- asymptotic_std_error = 1/np.sqrt(length)
- est_sig_figures = int((-1)*np.around(np.log10(asymptotic_std_error)))
- conditional_causality_xy_z_round = np.around(conditional_causality_xy_z,
- est_sig_figures)
- return conditional_causality_xy_z_round
|