Note : KCIT, Kernel-based Conditional Independence Test

What is KCIT?

KCIT is a nonparametric CI test for continuous random variables proposed by Zhang [2]. It has a prominent advantage that the null distribution of
the test statistic is derived and can be estimated efficiently.

Paper Arxiv

Implementation

def gaussian_kernel

import pandas as pd
import numpy as np
from scipy.spatial.distance import pdist, squareform

def gaussian_kernel(data:pd.DataFrame, X:set, sigma:float = None, unit_variance:bool = False) -> np.array:
    X = data.loc[:, list(X)].to_numpy()

    # Check Var(X) == 1. If not, make it 1.
    if not unit_variance:
        X = X/X.std(axis=0)

    # from https://stats.stackexchange.com/questions/15798/how-to-calculate-a-gaussian-kernel-effectively-in-numpy
    dist = squareform(pdist(X, 'euclidean'))

    # If sigma is not assigned, set it median of distances
    if not sigma : 
      n = len(dist)
      sigma = np.median(dist[np.triu_indices(n, k = 1)])

    K = np.exp(-dist**2/ (2*sigma**2))
    return K

def centralized_gaussian_kernel

def centralized_gaussian_kernel(data:pd.DataFrame, X:set, sigma:float = None, unit_variance:bool = False) -> np.array:
    K = gaussian_kernel(data, X, sigma, unit_variance)
    n = len(K)
    H = np.identity(n) - np.ones((n, n))/n
    return np.matmul(np.matmul(H, K), H)

def kcit

def kcit(data:pd.DataFrame, X:set, Y:set, Z:set = None, regulation:float = 1e-3, alpha:float = 0.05, unit_variance:bool = False, **test_kwarg) -> bool:
    # hyper parameter setting
    hyper_parameter = dict()

    if len(data) < 200: hyper_parameter['width'] = 0.8
    elif len(data) > 1200: hyper_parameter['width'] = 0.3
    else: hyper_parameter['width'] = 0.5 

    for kw, value in test_kwarg.items():
        hyper_parameter[kw] = value

    n = len(data)
    # Independence Test
    if Z is None:
        # Kx, Ky <- centralized kernel matrix of X, Y 
        Kx = centralized_gaussian_kernel(data, X, unit_variance)
        Ky = centralized_gaussian_kernel(data, Y, unit_variance)

        T = np.matmul(Kx, Ky).trace() / n
        E_T = Kx.trace() * Ky.trace() / (n**2)
        V_T = 2 * np.matmul(Kx, Kx).trace() * np.matmul(Ky, Ky).trace() / (n**4)

        k = E_T**2 / V_T
        theta = V_T/E_T

    # Conditional Independence Test
    else:
        # X = [X, Z]
        X = X|Z

        # Kx, Ky, Kz <- centralized kernel matrix of X, Y, Z with hyper_parameter['width']
        Kx = centralized_gaussian_kernel(data, X, hyper_parameter['width'])
        Ky = centralized_gaussian_kernel(data, Y, hyper_parameter['width'])
        Kz = centralized_gaussian_kernel(data, Z, hyper_parameter['width'])
        
        Rz = regulation * np.linalg.inv(Kz + regulation * np.identity(n))
        Kxz = np.matmul(np.matmul(Rz, Kx), Rz)
        Kyz = np.matmul(np.matmul(Rz, Ky), Rz)

        Lxz, Vx = np.linalg.eig(Kxz)
        Lyz, Vy = np.linalg.eig(Kyz)

        # soring eigenvalues and corresponding eigenvectors
        idx = Lxz.argsort()
        Lxz, Vx = Lxz[idx], np.real(Vx[:, idx])
        idx = Lyz.argsort()
        Lyz, Vy = Lyz[idx], np.real(Vy[:, idx])

        # diag(Vx(Vy.T))
        W = np.zeros(n)
        for t in range(n):
            W[t] = np.inner(Vx[t], Vy[t])

        # W = W(W.T)
        W = np.asmatrix(W)
        W = np.matmul(W, W.transpose())

        T = np.matmul(Kxz, Kyz).trace().item()/n
        E_T = W.trace().item()/n
        V_T = 2 * np.matmul(W, W).trace().item()/(n**2)
    
        k = E_T**2 / V_T
        theta = V_T/E_T

        # In the paper, theta = V_T/E_T. 
        # But in practice, the power of test decreases as n -> inf
        # So I modify theta = 1/np.log10(E_T/V_T) when the power is too weak.
        # theta = 1/np.log10(E_T/V_T)
        

    cri = stats.gamma(a = k, scale = theta).ppf(1 - alpha)
    
    return T < cri

Example

import scipy.stats as stats

error = stats.norm(scale = 0.1)
size = 50

data = pd.DataFrame()
data['A'] =  stats.norm().rvs(size = size)
data['B'] =  stats.norm().rvs(size = size)
# A, B, C are independent

kcit(data, {'A'}, {'B'})
True

import scipy.stats as stats

size = 1000

data = pd.DataFrame()
data['A'] =  stats.norm().rvs(size = size)
data['B'] =  stats.norm().rvs(size = size)
data['C'] =  data['A'] + data['B'] + stats.norm().rvs(size = size)
# A -> C <- B

kcit(data, {'A'}, {'B'})
True
kcit(data, {'A'}, {'B'}, {'C'})
False

import scipy.stats as stats

size = 1000

data = pd.DataFrame()
data['A'] =  stats.gamma(a = 3).rvs(size = size) # dist A is not norm
data['B'] =  stats.beta(a = 2, b = 3).rvs(size = size) + stats.norm().rvs(size = size) # dist B is not norm
data['C'] =  1/(data['A'] + data['B']) + stats.norm().rvs(size = size) # C = 1/(A+B) + e
data['D'] =  data['A'] ** data['B'] + stats.norm().rvs(size = size) # D = A ^ B + e
# A -> C <- B
# A -> D <- B
# Therefore, C and D are independent only when A and B are observed

kcit(data, {'C'}, {'D'})
False
kcit(data, {'C'}, {'D'}, {'A'})
False
kcit(data, {'C'}, {'D'}, {'A', 'B'})
True

Reference

[2] Zhang, K., Peters, J., Janzing, D., & Schölkopf, B. (2011). Kernel-based conditional independence test and application in causal discovery. Proceedings of the 27th Conference on Uncertainty in Artificial Intelligence, UAI 2011

profile
move out to : https://lobster-tech.com?utm_source=velog

0개의 댓글