KCIT is a nonparametric CI test for continuous random variables proposed by Zhang [2]. It has a prominent advantage that the null distribution of
the test statistic is derived and can be estimated efficiently.
import pandas as pd
import numpy as np
from scipy.spatial.distance import pdist, squareform
def gaussian_kernel(data:pd.DataFrame, X:set, sigma:float = None, unit_variance:bool = False) -> np.array:
X = data.loc[:, list(X)].to_numpy()
# Check Var(X) == 1. If not, make it 1.
if not unit_variance:
X = X/X.std(axis=0)
# from https://stats.stackexchange.com/questions/15798/how-to-calculate-a-gaussian-kernel-effectively-in-numpy
dist = squareform(pdist(X, 'euclidean'))
# If sigma is not assigned, set it median of distances
if not sigma :
n = len(dist)
sigma = np.median(dist[np.triu_indices(n, k = 1)])
K = np.exp(-dist**2/ (2*sigma**2))
return K
def centralized_gaussian_kernel(data:pd.DataFrame, X:set, sigma:float = None, unit_variance:bool = False) -> np.array:
K = gaussian_kernel(data, X, sigma, unit_variance)
n = len(K)
H = np.identity(n) - np.ones((n, n))/n
return np.matmul(np.matmul(H, K), H)
def kcit(data:pd.DataFrame, X:set, Y:set, Z:set = None, regulation:float = 1e-3, alpha:float = 0.05, unit_variance:bool = False, **test_kwarg) -> bool:
# hyper parameter setting
hyper_parameter = dict()
if len(data) < 200: hyper_parameter['width'] = 0.8
elif len(data) > 1200: hyper_parameter['width'] = 0.3
else: hyper_parameter['width'] = 0.5
for kw, value in test_kwarg.items():
hyper_parameter[kw] = value
n = len(data)
# Independence Test
if Z is None:
# Kx, Ky <- centralized kernel matrix of X, Y
Kx = centralized_gaussian_kernel(data, X, unit_variance)
Ky = centralized_gaussian_kernel(data, Y, unit_variance)
T = np.matmul(Kx, Ky).trace() / n
E_T = Kx.trace() * Ky.trace() / (n**2)
V_T = 2 * np.matmul(Kx, Kx).trace() * np.matmul(Ky, Ky).trace() / (n**4)
k = E_T**2 / V_T
theta = V_T/E_T
# Conditional Independence Test
else:
# X = [X, Z]
X = X|Z
# Kx, Ky, Kz <- centralized kernel matrix of X, Y, Z with hyper_parameter['width']
Kx = centralized_gaussian_kernel(data, X, hyper_parameter['width'])
Ky = centralized_gaussian_kernel(data, Y, hyper_parameter['width'])
Kz = centralized_gaussian_kernel(data, Z, hyper_parameter['width'])
Rz = regulation * np.linalg.inv(Kz + regulation * np.identity(n))
Kxz = np.matmul(np.matmul(Rz, Kx), Rz)
Kyz = np.matmul(np.matmul(Rz, Ky), Rz)
Lxz, Vx = np.linalg.eig(Kxz)
Lyz, Vy = np.linalg.eig(Kyz)
# soring eigenvalues and corresponding eigenvectors
idx = Lxz.argsort()
Lxz, Vx = Lxz[idx], np.real(Vx[:, idx])
idx = Lyz.argsort()
Lyz, Vy = Lyz[idx], np.real(Vy[:, idx])
# diag(Vx(Vy.T))
W = np.zeros(n)
for t in range(n):
W[t] = np.inner(Vx[t], Vy[t])
# W = W(W.T)
W = np.asmatrix(W)
W = np.matmul(W, W.transpose())
T = np.matmul(Kxz, Kyz).trace().item()/n
E_T = W.trace().item()/n
V_T = 2 * np.matmul(W, W).trace().item()/(n**2)
k = E_T**2 / V_T
theta = V_T/E_T
# In the paper, theta = V_T/E_T.
# But in practice, the power of test decreases as n -> inf
# So I modify theta = 1/np.log10(E_T/V_T) when the power is too weak.
# theta = 1/np.log10(E_T/V_T)
cri = stats.gamma(a = k, scale = theta).ppf(1 - alpha)
return T < cri
import scipy.stats as stats
error = stats.norm(scale = 0.1)
size = 50
data = pd.DataFrame()
data['A'] = stats.norm().rvs(size = size)
data['B'] = stats.norm().rvs(size = size)
# A, B, C are independent
kcit(data, {'A'}, {'B'})
True
import scipy.stats as stats
size = 1000
data = pd.DataFrame()
data['A'] = stats.norm().rvs(size = size)
data['B'] = stats.norm().rvs(size = size)
data['C'] = data['A'] + data['B'] + stats.norm().rvs(size = size)
# A -> C <- B
kcit(data, {'A'}, {'B'})
True
kcit(data, {'A'}, {'B'}, {'C'})
False
import scipy.stats as stats
size = 1000
data = pd.DataFrame()
data['A'] = stats.gamma(a = 3).rvs(size = size) # dist A is not norm
data['B'] = stats.beta(a = 2, b = 3).rvs(size = size) + stats.norm().rvs(size = size) # dist B is not norm
data['C'] = 1/(data['A'] + data['B']) + stats.norm().rvs(size = size) # C = 1/(A+B) + e
data['D'] = data['A'] ** data['B'] + stats.norm().rvs(size = size) # D = A ^ B + e
# A -> C <- B
# A -> D <- B
# Therefore, C and D are independent only when A and B are observed
kcit(data, {'C'}, {'D'})
False
kcit(data, {'C'}, {'D'}, {'A'})
False
kcit(data, {'C'}, {'D'}, {'A', 'B'})
True
[2] Zhang, K., Peters, J., Janzing, D., & Schölkopf, B. (2011). Kernel-based conditional independence test and application in causal discovery. Proceedings of the 27th Conference on Uncertainty in Artificial Intelligence, UAI 2011