ロジスティック回帰祭りによる因果探索[明確には因果探索ではありません](付録で実装したコードあります!)

最初に

追記:事前に念押ししておきますが、この手法はロジスティック回帰で関係性の強さを測り、AICBICで関係の方向性を決めるというやり方で、関係性の強さと方向の示唆を得るものだと思ってください。厳密には因果関係を探っているのではなく、関係の強さと方向を探っているということです。その時に交絡因子などを考慮して変数間の関係の強さを出すために、条件付きロジスティック回帰を使ったりしてる感じです。なので最後の方にも書いてありますが、初手の因果探索的な使い方として、大まかな関係性の強さや方向性の示唆を得るのには有効かなぐらいにとらえてください。ロジスティック回帰ではもちろん因果関係はわかりません。何度も言いますが関係性の強さだけです。

ロジスティック回帰祭りによる因果探索という話をこの前聞いたので、気になったので記事にしてみることにしました。大まかな話をすると、因果探索の初期段階で使われることが多い様です。

理論

まず、ロジスティック回帰の基本的な形式から始めましょう。

2つの変数 XY の間の因果関係を考える場合、ロジスティック回帰モデルは以下のように表されます
P(Y=1|X) = \frac{1}{1 + e^{-(β_0 + β_1X)}}
ここで、P(Y=1|X)X が与えられたときに Y が1となる確率を表します。β_0 は切片、β_1X の係数です。このモデルでは、X から Y への因果効果の強さを β_1 の大きさで評価します。

  
因果探索においては、このモデルを全ての変数ペアに対して適用します。

例えば、変数 XYZ がある場合、以下の6つのモデルを考えます
P(Y=1|X)
P(X=1|Y)
P(Z=1|X)
P(X=1|Z)
P(Z=1|Y)
P(Y=1|Z)

  

各モデルの係数 β_1 の大きさと統計的有意性を比較することで、変数間の因果関係の強さと方向性を推定します。
しかし、この手法には限界があります。特に、交絡因子の存在を考慮できないという問題があります。例えば、XY の関係が実際には Z によって説明される場合、単純なペアワイズの比較では誤った結論を導く可能性があります。

この問題に対処するため、条件付きロジスティック回帰を使用することがあります。例えば

P(Y=1|X,Z) = \frac{1}{1 + e^{-(β_0 + β_1X + β_2Z)}}
このモデルでは、Z の影響を制御しつつ、X から Y への因果効果を推定できます。

さらに、因果の方向性を決定するために、モデルの適合度を比較することもあります。例えば、$X → Y$ と $Y → X$ のモデルを比較し、より適合度の高いモデルを選択します。適合度の指標としては、AIC赤池情報量規準)やBICベイズ情報量規準)などが用いられます

AIC = -2\log L + 2k
BIC = -2\log L + k\log n
ここで、L は最大尤度、k はパラメータ数、n はサンプルサイズです。

これらの手法を組み合わせることで、ロジスティック回帰を用いた因果探索が行われます。ただし、この方法は探索的な分析に適しており、真の因果関係を確定するためには、実験的な検証や他の因果推論手法との組み合わせが必要です。また、潜在変数や非線形な関係性、時間的な依存関係など、より複雑な因果構造を扱うためには、構造方程式モデリングや時系列分析などの高度な手法が必要となります。
ということで、最初の大雑把な因果の可能性の探索には最適そうだということがわかるかと思います。ロジスティック回帰祭りで、大雑把な因果探索を行うというのは案外いいかもしれませんね。

付録(実装してみた)

ロジックとしては、条件付きロジスティック回帰を行っていて、AICを用いて矢印の方向を決定しています。
因みにpydotはcondaでインストールしたほうが良いです。pipだと別途Graphbizというものをインストールする必要があるようです。

class CausalExplorer:
    def __init__(self, data, variable_names):
        self.data = data
        self.variable_names = variable_names
        self.relationships = []

    def explore(self):
        self._perform_conditional_regression()

    def _perform_conditional_regression(self):
        n_vars = len(self.variable_names)
        for i, j in combinations(range(n_vars), 2):
            control_indices = [k for k in range(n_vars) if k != i and k != j]
            try:
                coef_ij, aic_ij = self._conditional_logistic_regression_with_aic(
                    self.data[:, i], self.data[:, j], self.data[:, control_indices]
                )
                coef_ji, aic_ji = self._conditional_logistic_regression_with_aic(
                    self.data[:, j], self.data[:, i], self.data[:, control_indices]
                )
                
                if aic_ij < aic_ji:
                    self.relationships.append((
                        self.variable_names[i],
                        self.variable_names[j],
                        coef_ij,
                        aic_ij,
                        [self.variable_names[k] for k in control_indices]
                    ))
                else:
                    self.relationships.append((
                        self.variable_names[j],
                        self.variable_names[i],
                        coef_ji,
                        aic_ji,
                        [self.variable_names[k] for k in control_indices]
                    ))
            except ValueError as e:
                print(f"Error in regression between {self.variable_names[i]} and {self.variable_names[j]}: {str(e)}")

    def _conditional_logistic_regression_with_aic(self, X, Y, Z):
        X_binary = self._ensure_binary(X)
        Y_binary = self._ensure_binary(Y)
        Z_binary = np.apply_along_axis(self._ensure_binary, 0, Z)
        
        X_with_controls = np.column_stack((X_binary.reshape(-1, 1), Z_binary))
        
        if len(np.unique(Y_binary)) < 2:
            raise ValueError(f"Target variable has only one class: {np.unique(Y_binary)[0]}")
        
        model = LogisticRegression()
        model.fit(X_with_controls, Y_binary)
        
        coef = model.coef_[0][0]
        
        n = len(Y)
        k = X_with_controls.shape[1] + 1
        ll = model.score(X_with_controls, Y_binary) * n
        aic = 2 * k - 2 * ll
        
        return coef, aic

    def _ensure_binary(self, X):
        unique_values = np.unique(X)
        if len(unique_values) == 2:
            return (X == unique_values[1]).astype(int)
        elif len(unique_values) > 2:
            median = np.median(X)
            return (X > median).astype(int)
        else:
            raise ValueError(f"Cannot binarize data with {len(unique_values)} unique values")
    
    def create_dag_plot(self, filename, threshold=0.1):
        # 有向グラフの作成
        graph = pydot.Dot(graph_type='digraph', rankdir='LR')
        
        # 最大の重み(係数の絶対値)を計算
        max_weight = max(abs(coef) for _, _, coef, _, _ in self.relationships if abs(coef) > threshold)
        
        # ノードの追加
        for var in self.variable_names:
            node = pydot.Node(var, shape="circle")
            graph.add_node(node)
        
        # エッジ(矢印)の追加
        for source, target, coef, _, _ in self.relationships:
            if abs(coef) > threshold:
                # 係数の大きさに基づいて色を設定(赤色の濃さ)
                color = f"#{int(255 * (1 - abs(coef)/max_weight)):02x}0000"
                # エッジの作成(ラベル、色、太さを設定)
                edge = pydot.Edge(source, target, label=f"{coef:.2f}", color=color, penwidth=1 + 2 * abs(coef)/max_weight)
                graph.add_edge(edge)
        
        # グラフをPNG画像として保存
        graph.write_png(f"{filename}.png")
        print(f"DAG image saved as {filename}.png")

    def print_relationships(self, threshold=0.1):
        print("Conditional relationships:")
        # 閾値を超える関係性のみを表示
        for source, target, coef, aic, controls in self.relationships:
            if abs(coef) > threshold:
                print(f"{source} -> {target} | {', '.join(controls)}: coefficient = {coef:.3f}, AIC = {aic:.3f}")

実際に実行してみるコードが以下です

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss
import pydot
from itertools import combinations

# サンプルデータの生成
np.random.seed(42)
n_samples = 1000

# 基本変数の生成
A = np.random.randn(n_samples)
B = 0.5 * A + np.random.randn(n_samples)
C = 0.3 * A + 0.7 * B + np.random.randn(n_samples)

# 追加変数の生成
D = 0.4 * B + 0.6 * C + np.random.randn(n_samples)
E = 0.2 * A + 0.3 * C + np.random.randn(n_samples)
F = 0.5 * D + 0.5 * E + np.random.randn(n_samples)
G = 0.4 * C + 0.3 * E + 0.3 * F + np.random.randn(n_samples)

# データフレームの作成
data = pd.DataFrame({
    'A': A, 'B': B, 'C': C, 'D': D, 'E': E, 'F': F, 'G': G
})

# データの標準化
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data)

# 変数名のリスト
variable_names = list(data.columns)

# CausalExplorerの初期化と実行
explorer = CausalExplorer(data_scaled, variable_names)
explorer.explore()

# 結果の表示
explorer.print_relationships(threshold=0.1)

# DAG画像の作成
explorer.create_dag_plot("causal_diagram", threshold=0.1)

# 相関係数の表示
correlation_matrix = data.corr()
print("\nCorrelation matrix:")
print(correlation_matrix)

実行結果は以下です。

実行結果
意外といい感じに実装できたのではないでしょうか?
因果探索の初手としては結構いい気がしますね!
グラフの見方としては、DAG的な感じで、数値はロジスティック回帰の係数を表しています。また、係数の絶対値が大きいほど線が太く、色が濃くなっているはずです。係数の絶対値が大きいほど、因果関係が強いとみてよいというのは、事前の理論でお話したので、そのように見ればいいものです。ただし、ロジスティック回帰での因果探索は、因果の可能性を初手で探るのに適しているというくらいで、実際にそのくらいの因果があるとは見てはいけないことは注意書きとして書いておきます。

追記(付録の更新)

更新したコードです。更新点としては、AICによる方向の決定をしないDAGも出力するようにする部分です。

class CausalExplorer:
    def __init__(self, data, variable_names):
        # データと変数名を初期化
        self.data = data
        self.variable_names = variable_names
        self.relationships = []

    def explore(self):
        # 条件付きロジスティック回帰を使用して因果関係を探索
        self._perform_conditional_regression()
        # 通常のDAGと双方向のDAGを生成
        self.create_dag_plot("causal_diagram")
        self.create_bidirectional_dag_plot("causal_diagram")

    def _perform_conditional_regression(self):
        n_vars = len(self.variable_names)
        # すべての変数の組み合わせに対して条件付きロジスティック回帰を実行
        for i, j in combinations(range(n_vars), 2):
            # 制御変数のインデックスリストを作成
            control_indices = [k for k in range(n_vars) if k != i and k != j]
            try:
                # i -> j の方向で回帰を実行
                coef_ij, aic_ij = self._conditional_logistic_regression_with_aic(
                    self.data[:, i], self.data[:, j], self.data[:, control_indices]
                )
                # j -> i の方向で回帰を実行
                coef_ji, aic_ji = self._conditional_logistic_regression_with_aic(
                    self.data[:, j], self.data[:, i], self.data[:, control_indices]
                )
                
                # AICが小さい方向を選択
                if aic_ij < aic_ji:
                    self.relationships.append((
                        self.variable_names[i],
                        self.variable_names[j],
                        coef_ij,
                        aic_ij,
                        [self.variable_names[k] for k in control_indices]
                    ))
                else:
                    self.relationships.append((
                        self.variable_names[j],
                        self.variable_names[i],
                        coef_ji,
                        aic_ji,
                        [self.variable_names[k] for k in control_indices]
                    ))
            except ValueError as e:
                print(f"Error in regression between {self.variable_names[i]} and {self.variable_names[j]}: {str(e)}")

    def _conditional_logistic_regression_with_aic(self, X, Y, Z):
        # Xを説明変数、Yを目的変数、Zを制御変数とする条件付きロジスティック回帰
        X_binary = self._ensure_binary(X)
        Y_binary = self._ensure_binary(Y)
        Z_binary = np.apply_along_axis(self._ensure_binary, 0, Z)
        
        X_with_controls = np.column_stack((X_binary.reshape(-1, 1), Z_binary))
        
        if len(np.unique(Y_binary)) < 2:
            raise ValueError(f"Target variable has only one class: {np.unique(Y_binary)[0]}")
        
        # ロジスティック回帰モデルの作成と学習
        model = LogisticRegression()
        model.fit(X_with_controls, Y_binary)
        
        # Xに対応する係数を取得
        coef = model.coef_[0][0]
        
        # AICの計算
        n = len(Y)
        k = X_with_controls.shape[1] + 1  # パラメータ数(切片を含む)
        ll = model.score(X_with_controls, Y_binary) * n  # 対数尤度
        aic = 2 * k - 2 * ll
        
        return coef, aic

    def _ensure_binary(self, X):
        # データを二値化する
        unique_values = np.unique(X)
        if len(unique_values) == 2:
            return (X == unique_values[1]).astype(int)
        elif len(unique_values) > 2:
            median = np.median(X)
            return (X > median).astype(int)
        else:
            raise ValueError(f"Cannot binarize data with {len(unique_values)} unique values")

    def create_dag_plot(self, filename, threshold=0.1):
        # 有向グラフの作成
        graph = pydot.Dot(graph_type='digraph', rankdir='LR')
        
        # 最大の重み(係数の絶対値)を計算
        max_weight = max(abs(coef) for _, _, coef, _, _ in self.relationships if abs(coef) > threshold)
        
        # ノードの追加
        for var in self.variable_names:
            node = pydot.Node(var, shape="circle")
            graph.add_node(node)
        
        # エッジ(矢印)の追加
        for source, target, coef, _, _ in self.relationships:
            if abs(coef) > threshold:
                # 係数の大きさに基づいて色を設定(赤色の濃さ)
                color = f"#{int(255 * (1 - abs(coef)/max_weight)):02x}0000"
                # エッジの作成(ラベル、色、太さを設定)
                edge = pydot.Edge(source, target, label=f"{coef:.2f}", color=color, penwidth=1 + 2 * abs(coef)/max_weight)
                graph.add_edge(edge)
        
        # グラフをPNG画像として保存
        graph.write_png(f"{filename}.png")
        print(f"DAG image saved as {filename}.png")

    def create_bidirectional_dag_plot(self, filename, threshold=0.1):
        # 双方向の有向グラフの作成
        graph = pydot.Dot(graph_type='digraph', rankdir='LR')
        
        # 最大の重み(係数の絶対値)を計算
        max_weight = max(abs(coef) for _, _, coef, _, _ in self.relationships if abs(coef) > threshold)
        
        # ノードの追加
        for var in self.variable_names:
            node = pydot.Node(var, shape="circle")
            graph.add_node(node)
        
        # 双方向エッジの追加
        added_edges = set()
        for source, target, coef, _, _ in self.relationships:
            if abs(coef) > threshold:
                if (source, target) not in added_edges and (target, source) not in added_edges:
                    # 係数の大きさに基づいて色を設定(赤色の濃さ)
                    color = f"#{int(255 * (1 - abs(coef)/max_weight)):02x}0000"
                    # 双方向エッジの作成(ラベル、色、太さを設定)
                    edge = pydot.Edge(source, target, label=f"{coef:.2f}", color=color, penwidth=1 + 2 * abs(coef)/max_weight, dir='both')
                    graph.add_edge(edge)
                    added_edges.add((source, target))
        
        # グラフをPNG画像として保存
        graph.write_png(f"{filename}_bidirectional.png")
        print(f"Bidirectional DAG image saved as {filename}_bidirectional.png")

    def print_relationships(self, threshold=0.1):
        print("Conditional relationships:")
        # 閾値を超える関係性のみを表示
        for source, target, coef, aic, controls in self.relationships:
            if abs(coef) > threshold:
                print(f"{source} -> {target} | {', '.join(controls)}: coefficient = {coef:.3f}, AIC = {aic:.3f}")

実行してみるコードはほぼ変わらないですが

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import log_loss
import pydot
from itertools import combinations

# サンプルデータの生成
np.random.seed(42)
n_samples = 1000

# 基本変数の生成
A = np.random.randn(n_samples)
B = 0.5 * A + np.random.randn(n_samples)
C = 0.3 * A + 0.7 * B + np.random.randn(n_samples)

# 追加変数の生成
D = 0.4 * B + 0.6 * C + np.random.randn(n_samples)
E = 0.2 * A + 0.3 * C + np.random.randn(n_samples)
F = 0.5 * D + 0.5 * E + np.random.randn(n_samples)
G = 0.4 * C + 0.3 * E + 0.3 * F + np.random.randn(n_samples)

# データフレームの作成
data = pd.DataFrame({
    'A': A, 'B': B, 'C': C, 'D': D, 'E': E, 'F': F, 'G': G
})

# データの標準化
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data)

# 変数名のリスト
variable_names = list(data.columns)

# CausalExplorerの初期化と実行
explorer = CausalExplorer(data_scaled, variable_names)
explorer.explore()

# 結果の表示
explorer.print_relationships(threshold=0.1)

# 相関係数の表示
correlation_matrix = data.corr()
print("\nCorrelation matrix:")
print(correlation_matrix)

タイタニックのデータで因果探索のコードを走らせてみたら、色々と問題があったので、大元のコードも変更加えてます。また、AICで方向を決定するのはいいんですが、誤解を生みかねない部分もあるので、双方向の矢印のものも出力するようにしたコードが上記です。これはタイタニックのデータでやった時に、生存結果からほかの変数の方に矢印が出てて、矢印の方向ちがくね?これを実務で使ったらいらぬ誤解と、信用してもらえないよなと思ったので、双方向のもプラスした次第です。
一応ですが、thresholdの部分で、設定未満の線は表示されないようになるようにしてあります。

ついでにAICではなくBICで方向選択するバージョンも

class CausalExplorer:
    def __init__(self, data, variable_names):
        # データと変数名を初期化
        self.data = data
        self.variable_names = variable_names
        self.relationships = []

    def explore(self):
        # 条件付きロジスティック回帰を使用して因果関係を探索
        self._perform_conditional_regression()
        # 通常のDAGと双方向のDAGを生成
        self.create_dag_plot("causal_diagram")
        self.create_bidirectional_dag_plot("causal_diagram")

    def _perform_conditional_regression(self):
        n_vars = len(self.variable_names)
        # すべての変数の組み合わせに対して条件付きロジスティック回帰を実行
        for i, j in combinations(range(n_vars), 2):
            # 制御変数のインデックスリストを作成
            control_indices = [k for k in range(n_vars) if k != i and k != j]
            try:
                # i -> j の方向で回帰を実行
                coef_ij, bic_ij = self._conditional_logistic_regression_with_bic(
                    self.data[:, i], self.data[:, j], self.data[:, control_indices]
                )
                # j -> i の方向で回帰を実行
                coef_ji, bic_ji = self._conditional_logistic_regression_with_bic(
                    self.data[:, j], self.data[:, i], self.data[:, control_indices]
                )
                
                # BICが小さい方向を選択 (BICが小さいほどモデルの適合度が高い)
                if bic_ij < bic_ji:
                    self.relationships.append((
                        self.variable_names[i],
                        self.variable_names[j],
                        coef_ij,
                        bic_ij,
                        [self.variable_names[k] for k in control_indices]
                    ))
                else:
                    self.relationships.append((
                        self.variable_names[j],
                        self.variable_names[i],
                        coef_ji,
                        bic_ji,
                        [self.variable_names[k] for k in control_indices]
                    ))
            except ValueError as e:
                print(f"Error in regression between {self.variable_names[i]} and {self.variable_names[j]}: {str(e)}")

    def _conditional_logistic_regression_with_bic(self, X, Y, Z):
        # Xを説明変数、Yを目的変数、Zを制御変数とする条件付きロジスティック回帰
        X_binary = self._ensure_binary(X)
        Y_binary = self._ensure_binary(Y)
        Z_binary = np.apply_along_axis(self._ensure_binary, 0, Z)
        
        X_with_controls = np.column_stack((X_binary.reshape(-1, 1), Z_binary))
        
        if len(np.unique(Y_binary)) < 2:
            raise ValueError(f"Target variable has only one class: {np.unique(Y_binary)[0]}")
        
        # ロジスティック回帰モデルの作成と学習
        model = LogisticRegression()
        model.fit(X_with_controls, Y_binary)
        
        # Xに対応する係数を取得
        coef = model.coef_[0][0]
        
        # BICの計算
        n = len(Y)
        k = X_with_controls.shape[1] + 1  # パラメータ数(切片を含む)
        ll = model.score(X_with_controls, Y_binary) * n  # 対数尤度
        bic = np.log(n) * k - 2 * ll  # BICの計算式
        
        return coef, bic

    def _ensure_binary(self, X):
        # データを二値化する
        unique_values = np.unique(X)
        if len(unique_values) == 2:
            return (X == unique_values[1]).astype(int)
        elif len(unique_values) > 2:
            median = np.median(X)
            return (X > median).astype(int)
        else:
            raise ValueError(f"Cannot binarize data with {len(unique_values)} unique values")

    def create_dag_plot(self, filename, threshold=0.1):
        # 有向グラフの作成
        graph = pydot.Dot(graph_type='digraph', rankdir='LR')
        
        # 最大の重み(係数の絶対値)を計算
        max_weight = max(abs(coef) for _, _, coef, _, _ in self.relationships if abs(coef) > threshold)
        
        # ノードの追加
        for var in self.variable_names:
            node = pydot.Node(var, shape="circle")
            graph.add_node(node)
        
        # エッジ(矢印)の追加
        for source, target, coef, _, _ in self.relationships:
            if abs(coef) > threshold:
                # 係数の大きさに基づいて色を設定(赤色の濃さ)
                color = f"#{int(255 * (1 - abs(coef)/max_weight)):02x}0000"
                # エッジの作成(ラベル、色、太さを設定)
                edge = pydot.Edge(source, target, label=f"{coef:.2f}", color=color, penwidth=1 + 2 * abs(coef)/max_weight)
                graph.add_edge(edge)
        
        # グラフをPNG画像として保存
        graph.write_png(f"{filename}.png")
        print(f"DAG image saved as {filename}.png")

    def create_bidirectional_dag_plot(self, filename, threshold=0.1):
        # 双方向の有向グラフの作成
        graph = pydot.Dot(graph_type='digraph', rankdir='LR')
        
        # 最大の重み(係数の絶対値)を計算
        max_weight = max(abs(coef) for _, _, coef, _, _ in self.relationships if abs(coef) > threshold)
        
        # ノードの追加
        for var in self.variable_names:
            node = pydot.Node(var, shape="circle")
            graph.add_node(node)
        
        # 双方向エッジの追加
        added_edges = set()
        for source, target, coef, _, _ in self.relationships:
            if abs(coef) > threshold:
                if (source, target) not in added_edges and (target, source) not in added_edges:
                    # 係数の大きさに基づいて色を設定(赤色の濃さ)
                    color = f"#{int(255 * (1 - abs(coef)/max_weight)):02x}0000"
                    # 双方向エッジの作成(ラベル、色、太さを設定)
                    edge = pydot.Edge(source, target, label=f"{coef:.2f}", color=color, penwidth=1 + 2 * abs(coef)/max_weight, dir='both')
                    graph.add_edge(edge)
                    added_edges.add((source, target))
        
        # グラフをPNG画像として保存
        graph.write_png(f"{filename}_bidirectional.png")
        print(f"Bidirectional DAG image saved as {filename}_bidirectional.png")

    def print_relationships(self, threshold=0.1):
        print("Conditional relationships:")
        # 閾値を超える関係性のみを表示
        for source, target, coef, bic, controls in self.relationships:
            if abs(coef) > threshold:
                print(f"{source} -> {target} | {', '.join(controls)}: coefficient = {coef:.3f}, BIC = {bic:.3f}")

実行コードは同じなので省きます

最後に

日常で聞いた、ロジスティック回帰祭りで因果探索というのが気になったので、自分で調べて簡単なツールを作ってみましたという記事だったのですが、いかがだったでしょうか?
最終的に得られた因果探索のグラフなんかは個人的に結構満足な出来だったのですが(笑)
個人的に因果推論関係に興味が最近はずっとあるので、こういう系の記事を書いていくことが多いと思います。この流れで、LiNGAMやベイジアンネットワークの記事でも書いてみようかと思っています。 読んでくださりありがとうございました!