IPWと多重代入法を用いた欠損値ありデータに対する傾向スコアマッチング

はじめに

最近、欠損値ありデータでの傾向スコアマッチングを実装する機会があったので、見聞きはしていたIPWと多重代入法を用いた傾向スコアマッチングを実装してみました。なので、今回はその理論的な裏付けに関して考察していき、実装に関しても扱っていきたいと思います。
傾向スコアマッチング的な考え方は、効果検証以外の文脈でも個人的には使用できると感じていて、潜在的変数の確率分布の近しい人同士をマッチングするという考え方は、色々な部分で応用が利くのではないかと考えている今日この頃です。それでは本題に入っていきましょう。

IPWと多重代入法を用いた欠損ありデータに対する傾向スコアマッチングの理論

傾向スコアマッチング自体は因果推論の文脈で頻繁に使用されてきました。ですが、観測データに欠損値が発生するのはつきものというか必然といってもいいと思います。傾向スコアを算出する上で最も良いとされているのは、ロジスティック回帰です。これは、傾向スコアそのものの目的が介入確率を正確に算出することでなく、潜在的変数の確率分布を傾向スコアという形で表すというものだからだと勝手に理解しています。(介入確率の正確な予測ではなく、共変量の分布を要約すること)傾向スコアがほぼ同一な対象者同士は潜在的変数の確率分布がほぼ同一であるとみなすことができるから、対象群のマッチング者を介入群の潜在的結果(介入されなかった場合の結果)として扱うことによってATT(介入群における介入による効果量)が算出できるのだと理解しています。長くなりましたが、これらの理由から正確な介入確率を求めるというのは、傾向スコアの算出の目的からずれており、傾向スコア算出の目的から一番適切な手法は、勾配ブースティング系のモデルなどではなく、ロジスティック回帰だということは主張できるかと思います。(完全に私の勝手な解釈なので間違っている可能性は極めて高い内容ですが)
前置きが長くなりましたが、傾向スコアをロジスティック回帰で算出するときに重要となってくるのは、交絡因子をモデルに変数として入れ込むことだとされています。交絡因子といえばでよく出てくるのは、人の属性を表す属性変数です。ですが、この属性変数欠損が多く発生していることが多くあるのです。なぜならば、属性値はすなわち顧客の個人情報なので、個人情報を顧客が開示しない限り欠損してしまうわけです。前置きが長くなりましたが、こういった事情で観察データに対する因果推論をするときに、欠損値が発生しているデータを扱わざる負えないことは多々あるというわけです。
そんな観察データに対する因果推論の解決策として、今回はIPW(逆確率重みづけ)と多重代入法を組み合わせた手法を使おうとなるわけです。
  

まず、処置効果の推定における基本的な枠組みから始めましょう。処置群をT=1、対照群をT=0とし、結果変数をYとします。因果効果として平均処置効果(ATE)を考えると、以下のように表されます。
ATE = E[Y(1) - Y(0)]
ここで、Y(1)は処置を受けた場合の潜在的結果、Y(0)は処置を受けなかった場合の潜在的結果を表します。
傾向スコアは、共変量Xが与えられた下での処置割り当ての条件付き確率として定義されます。
e(X) = P(T=1|X)
しかし、実際のデータでは欠損値が存在する場合が多く、これを$R$で表します。R=1はデータが観測された場合、R=0は欠損している場合を示します。このとき、欠損メカニズムがMAR(Missing At Random)であると仮定すると、欠損は観測された変数に依存して生じると考えられます。

IPWを用いた推定量は、欠損を考慮して以下のように拡張されます。

\hat{ATE}{IPW} = \frac{1}{n}\sum{i=1}^n \frac{R_iT_iY_i}{e(X_i)\pi_i} - \frac{R_i(1-T_i)Y_i}{(1-e(X_i))\pi_i}
ここで、\pi_iは欠損確率の逆数を表す重みです。これにより、観測されたデータに適切な重みを付けることで、欠損による偏りを補正することができます。

多重代入法は、欠損値を複数回補完することで、欠損値の不確実性を考慮します。具体的には以下の手順で実施されます。

\begin{aligned} &\text{Step 1: } X_{mis}^{(m)} \sim P(X_{mis}|X_{obs}) \ &\text{Step 2: } \hat{\theta}^{(m)} = f(X_{obs}, X_{mis}^{(m)}) \ &\text{Step 3: } \hat{\theta} = \frac{1}{M}\sum_{m=1}^M \hat{\theta}^{(m)} \end{aligned}

この手法の有効性は、以下の点に集約されます。IPWは観測データに適切な重みを付けることで選択バイアスを補正し、多重代入法は欠損値の不確実性を考慮することで推定の精度を向上させます。特に、大規模なデータセットや、欠損メカニズムがMARである場合に効果的です。
  
ただこのIPWと多重代入法の活用に関しては、注意点もあります。第一に、欠損メカニズムがMNAR(Missing Not At Random)の場合、推定にバイアスが生じる可能性があります。第二に、傾向スコアモデルの誤特定は結果に大きな影響を与える可能性があります。第三に、計算負荷が高く、特に大規模データセットでは実装が困難になる場合があります。
特に注意すべきは、多重代入法の補完回数です。一般的に欠損率が高い場合はより多くの代入回数が必要とされます。ですが、多くすると計算負荷が大きくなるため、ちょうどいい補完回数を見つけるのが困難な可能性もあります。欠損率が非常に高い場合は最低20回は担保することが必要なようです(個人調べ)
また、IPWの極端な重みの発生がある場合は、推定値の分散が大きくなる可能性があるため、重みの安定化や切断が検討されるべきだとされています。

実際に実装してみた

import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from scipy import stats
from statsmodels.stats.multitest import multipletests
import warnings
warnings.filterwarnings('ignore')

class PSMatchingWithMissing:
    """
    IPWと多重代入法を用いた欠損値対応の傾向スコアマッチングを行うクラス
    """
    
    def __init__(self, n_imp=5, caliper=0.2, max_ipw_ratio=10):
        """
        Parameters
        ----------
        n_imp : int, optional (default=5)
            多重代入法の実行回数
        caliper : float, optional (default=0.2)
            マッチングの許容範囲(標準偏差の何倍まで許容するか)
        max_ipw_ratio : float, optional (default=10)
            IPWの重みの最大比率(極端な重みを制限するため)
        """
        self.n_imp = n_imp
        self.caliper = caliper
        self.max_ipw_ratio = max_ipw_ratio
        self.imputer = None
        self.ps_model = None
        self.matched_pairs = None
        self.ipw_weights = None
        self.propensity_scores = None  # 傾向スコアを保存するための属性を追加
    
    def _check_missing_pattern(self, X):
        """
        欠損パターンを分析し、MARの可能性を評価する
        
        Returns
        -------
        dict : 検定結果と判定
        """
        # 欠損値の相関を確認
        missing_matrix = X.isnull().astype(int)
        corr_matrix = missing_matrix.corr()
        
        # Little's MCARテストの代替として各変数ペアの関連を確認
        n_vars = len(X.columns)
        p_values = []
        
        for i in range(n_vars):
            for j in range(i+1, n_vars):
                # カイ二乗検定で欠損パターンの独立性を確認
                chi2, p_val = stats.chi2_contingency(pd.crosstab(
                    missing_matrix.iloc[:, i],
                    missing_matrix.iloc[:, j]
                ))[:2]
                p_values.append(p_val)
        
        # 多重検定の補正
        if p_values:
            rejected, p_vals_corrected, _, _ = multipletests(p_values, method='bonferroni')
            
            return {
                'is_likely_mar': any(rejected),
                'correlation_matrix': corr_matrix,
                'p_values': p_vals_corrected,
                'judgment': '欠損メカニズムはMARである可能性が高い' if any(rejected) else 'MCARの可能性がある'
            }
        return {
            'is_likely_mar': None,
            'correlation_matrix': corr_matrix,
            'p_values': None,
            'judgment': '欠損パターンの評価に十分なデータがありません'
        }

    def _perform_multiple_imputation(self, X):
        """
        多重代入法を実行する
        """
        self.imputer = IterativeImputer(random_state=42, max_iter=10)
        imputed_datasets = []
        
        for _ in range(self.n_imp):
            imputed = pd.DataFrame(
                self.imputer.fit_transform(X),
                columns=X.columns,
                index=X.index
            )
            imputed_datasets.append(imputed)
            
        return imputed_datasets

    def _calculate_propensity_scores(self, X, treatment):
        """
        傾向スコアを計算する
        """
        self.ps_model = LogisticRegression(random_state=42)
        self.ps_model.fit(X, treatment)
        return self.ps_model.predict_proba(X)[:, 1]

    def _calculate_ipw(self, ps_scores, treatment):
        """
        IPWの重みを計算し、極端な重みを補正する
        """
        # 処置群と対照群それぞれの重みを計算
        weights = np.where(treatment == 1,
                          1/ps_scores,
                          1/(1-ps_scores))
        
        # 極端な重みを制限
        median_weight = np.median(weights)
        max_weight = median_weight * self.max_ipw_ratio
        weights = np.minimum(weights, max_weight)
        
        # 重みを標準化
        weights = weights / np.mean(weights)
        
        return weights

    def _find_matches(self, ps_scores, treatment, df_index):
        """
        傾向スコアに基づいてマッチングを行う
        """
        treated_idx = df_index[treatment == 1]
        control_idx = df_index[treatment == 0]
        
        # マッチングの許容範囲(caliper)を計算
        ps_std = np.std(ps_scores)
        max_dist = self.caliper * ps_std
        
        matches = []
        
        # 各処置群のサンプルに対して最適な対照群を見つける
        for t_idx in treated_idx:
            t_ps = ps_scores[df_index == t_idx][0]
            
            # 対照群との距離を計算
            distances = abs(ps_scores[df_index.isin(control_idx)] - t_ps)
            
            if len(distances) > 0 and min(distances) <= max_dist:
                best_match_idx = control_idx[distances.argmin()]
                matches.append([t_idx, best_match_idx])
                # マッチングに使用した対照群を除外
                control_idx = control_idx[control_idx != best_match_idx]
        
        return pd.DataFrame(matches, columns=['treatment_idx', 'control_idx'])

    def fit_transform(self, df, treatment_col, covariates):
        """
        傾向スコアマッチングを実行する
        
        Parameters
        ----------
        df : pandas.DataFrame
            入力データ
        treatment_col : str
            処置を示す列名
        covariates : list
            共変量の列名リスト
        
        Returns
        -------
        pandas.DataFrame
            マッチング結果(傾向スコアを含む)
        """
        X = df[covariates].copy()
        treatment = df[treatment_col].values
        
        # 多重代入の実行
        imputed_datasets = self._perform_multiple_imputation(X)
        
        all_matches = []
        all_weights = []
        all_ps_scores = []
        
        # 各代入データセットでマッチングを実行
        for imp_data in imputed_datasets:
            # 傾向スコアの計算
            ps_scores = self._calculate_propensity_scores(imp_data, treatment)
            
            # IPWの計算
            weights = self._calculate_ipw(ps_scores, treatment)
            
            # マッチングの実行
            matches = self._find_matches(ps_scores, treatment, df.index)
            
            all_matches.append(matches)
            all_weights.append(weights)
            all_ps_scores.append(ps_scores)
        
        # 結果の統合
        self.matched_pairs = pd.concat(all_matches).drop_duplicates()
        self.ipw_weights = np.mean(all_weights, axis=0)
        # 傾向スコアの平均を計算
        self.propensity_scores = np.mean(all_ps_scores, axis=0)
        
        # 結果をデータフレームとして整形
        result_df = df.copy()
        result_df['matched_pair_id'] = np.nan
        result_df['ipw_weight'] = self.ipw_weights
        result_df['propensity_score'] = self.propensity_scores  # 傾向スコアを追加
        
        for idx, row in self.matched_pairs.iterrows():
            result_df.loc[row['treatment_idx'], 'matched_pair_id'] = idx
            result_df.loc[row['control_idx'], 'matched_pair_id'] = idx
        
        return result_df

    def check_balance(self, df, treatment_col, covariates):
        """
        共変量のバランスをチェックする
        
        Returns
        -------
        pandas.DataFrame
            バランス診断の結果
        """
        if self.matched_pairs is None:
            raise ValueError("マッチング実行前にバランスチェックはできません")
        
        balance_stats = []
        
        for var in covariates:
            # マッチング前の標準化差
            treated_before = df[df[treatment_col] == 1][var]
            control_before = df[df[treatment_col] == 0][var]
            
            # マッチング後のデータ
            matched_data = df[df['matched_pair_id'].notna()]
            treated_after = matched_data[matched_data[treatment_col] == 1][var]
            control_after = matched_data[matched_data[treatment_col] == 0][var]
            
            # 標準化差の計算
            std_diff_before = (np.mean(treated_before) - np.mean(control_before)) / \
                            np.sqrt((np.var(treated_before) + np.var(control_before)) / 2)
            
            std_diff_after = (np.mean(treated_after) - np.mean(control_after)) / \
                            np.sqrt((np.var(treated_after) + np.var(control_after)) / 2)
            
            balance_stats.append({
                'variable': var,
                'std_diff_before': std_diff_before,
                'std_diff_after': std_diff_after,
                'improvement': abs(std_diff_before) - abs(std_diff_after)
            })
        
        return pd.DataFrame(balance_stats)

    def check_missing_mechanism(self, df, covariates):
        """
        欠損メカニズムの評価を行う
        
        Returns
        -------
        dict
            欠損メカニズムの評価結果
        """
        X = df[covariates]
        return self._check_missing_pattern(X)

返ってくるdfはマッチング対象のindex値をカラムとしてつけてわかるようにしています。caliperがマッチングを傾向スコアのどのくらいの幅で許容するかになっています。ここを小さくするとより厳密なマッチングが行われますが、マッチングする組の数が少なくなる恐れがあります。n_impは多重代入法の補完回数で欠損率が大きいほど大きくする必要があります。様々な追加で作ったメソッドはご自由にお使いください。

最後に

見聞きはしていたIPWと多重代入法を用いた欠損データに対する傾向スコアマッチングに関して扱ってみました。また、傾向スコアに関して皆が比較的誤解しがちな、介入確率を正確に予測するモデルを作ることが目的だと思っていることに関しても、個人的な理解を書いて本当の目的を書いたつもりです。この記事がどなたかの理解の一助となれば幸いです。最後までお読みいただきありがとうございました。