はじめに
クラスタリングにおいて、どの変数が効いているのかを見たい的なことを思ったことはありませんか?正直私はなかったです。ですが、ビジネス側の人から寄与度的なものがあると嬉しいといわれたことをきっかけに、クラスタリング手法によらない寄与度的なものの算出方法を考えてみたという経緯があります。それでは内容に入っていきましょう。
寄与度の算出方法
分散比
F値
F値は分散分析(ANOVA: Analysis of Variance)において用いられる統計量で、以下の式で表されます
この統計量は、クラスター間の変動とクラスター内の変動の比を表しており、値が大きいほどクラスター間の差異が顕著であることを示します。F値は分散比とは異なり、グループ間の差異が偶然では説明できないほど大きいかを示します。
シルエット係数
シルエット係数は、クラスター内の凝集度とクラスター間の分離度を組み合わせた指標で、以下のように定義されます
各変数に対するシルエット係数は、その変数単独でクラスターの構造をどの程度説明できるかを示す指標として解釈できます。つまり、ある変数について高いシルエット係数が得られた場合、その変数の値だけを見ても、データポイントが所属するクラスターをよく特徴付けられることを意味します。
一方で、シルエット係数が低い変数は、単独ではクラスター構造の説明に寄与していないことを示します。ただし、これは必ずしもその変数が不要であることを意味するわけではありません。他の変数との組み合わせによって初めて重要な意味を持つ場合もあります。
このように、各変数のシルエット係数は、クラスタリング結果における各変数の「単独での説明力」を評価する指標として解釈できます。
実装
import numpy as np import pandas as pd from scipy import stats from sklearn.metrics import silhouette_samples from typing import Union, Dict from dataclasses import dataclass @dataclass class FValueResult: """F値の計算結果を格納するデータクラス Attributes: f_value (float): 計算されたF値 p_value (float): F値に対応するp値 between_ss (float): グループ間平方和 within_ss (float): グループ内平方和 between_df (int): グループ間自由度 within_df (int): グループ内自由度 """ f_value: float p_value: float between_ss: float within_ss: float between_df: int within_df: int class IntegratedClusterContributionAnalyzer: """クラスタリングにおける変数の寄与度を総合的に分析するクラス このクラスは以下の指標を統合して変数の寄与度を評価します: - 分散比:クラスター間分散とクラスター内分散の比 - F値:分散分析に基づく統計量 - シルエット係数:クラスターの品質指標 """ def __init__(self): """初期化メソッド""" # 分析結果を格納するDataFrame self.results = None # 特徴量名のリスト self.feature_names = None def _convert_input_data(self, X: Union[np.ndarray, pd.DataFrame], labels: Union[np.ndarray, pd.Series]) -> tuple: """入力データを適切な形式に変換する Args: X: 特徴量行列(numpy配列またはDataFrame) labels: クラスターラベル(numpy配列またはSeries) Returns: tuple: (numpy配列に変換された特徴量行列, numpy配列に変換されたラベル) """ # DataFrameの場合、列名を保存してnumpy配列に変換 if isinstance(X, pd.DataFrame): self.feature_names = X.columns.tolist() X_array = X.values else: # numpy配列の場合、特徴量名を自動生成 X_array = np.array(X) self.feature_names = [f'feature_{i}' for i in range(X_array.shape[1])] # Seriesの場合、numpy配列に変換 if isinstance(labels, pd.Series): labels_array = labels.values else: labels_array = np.array(labels) return X_array, labels_array def calculate_variance_ratio(self, X: Union[np.ndarray, pd.DataFrame], labels: Union[np.ndarray, pd.Series]) -> np.ndarray: """各変数の分散比を計算する 分散比 = クラスター間分散 / クラスター内分散 Args: X: 特徴量行列 labels: クラスターラベル Returns: np.ndarray: 各特徴量の分散比 """ X_array, labels_array = self._convert_input_data(X, labels) n_features = X_array.shape[1] variance_ratios = np.zeros(n_features) for feature_idx in range(n_features): feature = X_array[:, feature_idx] unique_labels = np.unique(labels_array) # クラスター間分散の計算 # 各クラスターの平均を計算 cluster_means = np.array([feature[labels_array == k].mean() for k in unique_labels]) overall_mean = feature.mean() # クラスターサイズで重み付けした平均との差の二乗和 between_variance = np.sum( [np.sum(labels_array == k) * (mean - overall_mean) ** 2 for k, mean in enumerate(cluster_means)]) # クラスター内分散の計算 # 各クラスター内でのデータ点と平均との差の二乗和 within_variance = np.sum( [np.sum((feature[labels_array == k] - cluster_means[k]) ** 2) for k in range(len(unique_labels))]) # 分散が0の場合の処理 if within_variance != 0: variance_ratios[feature_idx] = between_variance / within_variance else: variance_ratios[feature_idx] = np.inf return variance_ratios def calculate_f_values(self, X: Union[np.ndarray, pd.DataFrame], labels: Union[np.ndarray, pd.Series]) -> Dict[str, np.ndarray]: """各変数のF値関連の統計量を計算する Args: X: 特徴量行列 labels: クラスターラベル Returns: Dict: F値、p値、効果量(η²)を含む辞書 """ X_array, labels_array = self._convert_input_data(X, labels) n_features = X_array.shape[1] # 結果を格納する配列の初期化 f_values = np.zeros(n_features) p_values = np.zeros(n_features) eta_squared = np.zeros(n_features) # 各特徴量についてF値を計算 for feature_idx in range(n_features): result = self._calculate_f_value_single(X_array, labels_array, feature_idx) f_values[feature_idx] = result.f_value p_values[feature_idx] = result.p_value # 効果量(η²)の計算 eta_squared[feature_idx] = result.between_ss / (result.between_ss + result.within_ss) return { 'f_value': f_values, 'p_value': p_values, 'eta_squared': eta_squared } def _calculate_f_value_single(self, X: np.ndarray, labels: np.ndarray, feature_idx: int) -> FValueResult: """単一変数のF値を計算する Args: X: 特徴量行列 labels: クラスターラベル feature_idx: F値を計算する特徴量のインデックス Returns: FValueResult: F値の計算結果を含むデータクラス """ feature = X[:, feature_idx] unique_labels = np.unique(labels) n_clusters = len(unique_labels) n_samples = len(feature) # 全体の平均と各クラスターの統計量を計算 grand_mean = np.mean(feature) cluster_means = np.array([np.mean(feature[labels == k]) for k in unique_labels]) cluster_sizes = np.array([np.sum(labels == k) for k in unique_labels]) # グループ間平方和の計算 between_ss = np.sum(cluster_sizes * (cluster_means - grand_mean) ** 2) # グループ内平方和の計算 within_ss = np.sum([ np.sum((feature[labels == k] - cluster_means[i]) ** 2) for i, k in enumerate(unique_labels) ]) # 自由度の計算 between_df = n_clusters - 1 within_df = n_samples - n_clusters # 平均平方の計算 between_ms = between_ss / between_df within_ms = within_ss / within_df if within_df > 0 else np.inf # F値とp値の計算 f_value = between_ms / within_ms if within_ms != 0 else np.inf p_value = 1 - stats.f.cdf(f_value, between_df, within_df) if not np.isinf(f_value) else 0 return FValueResult( f_value=f_value, p_value=p_value, between_ss=between_ss, within_ss=within_ss, between_df=between_df, within_df=within_df ) def calculate_silhouette_scores(self, X: Union[np.ndarray, pd.DataFrame], labels: Union[np.ndarray, pd.Series]) -> np.ndarray: """各変数のシルエット係数を計算する Args: X: 特徴量行列 labels: クラスターラベル Returns: np.ndarray: 各特徴量のシルエット係数 """ X_array, labels_array = self._convert_input_data(X, labels) n_features = X_array.shape[1] silhouette_scores = np.zeros(n_features) # 各特徴量について個別にシルエット係数を計算 for feature_idx in range(n_features): feature = X_array[:, feature_idx].reshape(-1, 1) silhouette_scores[feature_idx] = np.mean( silhouette_samples(feature, labels_array)) return silhouette_scores def analyze_contributions(self, X: Union[np.ndarray, pd.DataFrame], labels: Union[np.ndarray, pd.Series], weights: Dict[str, float] = None) -> pd.DataFrame: """各変数の寄与度を総合的に分析する Args: X: 特徴量行列 labels: クラスターラベル weights: 各指標の重み付け係数(デフォルトは均等配分) Returns: pd.DataFrame: 各指標と総合スコアを含むDataFrame """ # デフォルトの重みを設定 if weights is None: weights = { 'variance_ratio': 1/3, 'f_value': 1/3, 'silhouette': 1/3 } # 各指標の計算 variance_ratios = self.calculate_variance_ratio(X, labels) f_results = self.calculate_f_values(X, labels) silhouette_scores = self.calculate_silhouette_scores(X, labels) # 結果をDataFrameにまとめる results_df = pd.DataFrame({ 'variance_ratio': variance_ratios, 'f_value': f_results['f_value'], 'p_value': f_results['p_value'], 'eta_squared': f_results['eta_squared'], 'silhouette': silhouette_scores }, index=self.feature_names) # スケーリング関数の定義 def scale_scores(x): return (x - x.min()) / (x.max() - x.min()) if x.max() != x.min() else np.zeros_like(x) # 各指標の正規化 scaled_df = pd.DataFrame({ 'scaled_variance_ratio': scale_scores(results_df['variance_ratio']), 'scaled_f_value': scale_scores(results_df['f_value']), 'scaled_silhouette': scale_scores(results_df['silhouette']) }) # 総合スコアの計算 results_df['composite_score'] = ( weights['variance_ratio'] * scaled_df['scaled_variance_ratio'] + weights['f_value'] * scaled_df['scaled_f_value'] + weights['silhouette'] * scaled_df['scaled_silhouette'] ) # 結果の保存と返却 self.results = results_df return results_df def get_feature_rankings(self) -> pd.DataFrame: """全特徴量について、各指標でのランキングを返す Returns: pd.DataFrame: 各指標における特徴量のランキングと値を含むDataFrame """ if self.results is None: raise ValueError("先にanalyze_contributionsを実行してください") # ランキングを計算する指標 metrics = ['variance_ratio', 'f_value', 'silhouette', 'composite_score'] # 各指標についてランキングと値のDataFrameを作成 ranked_features = {} for metric in metrics: # 値で降順ソートした特徴量名を取得 sorted_features = self.results[metric].sort_values(ascending=False) # ランキングと値を含む辞書を作成 metric_dict = { f'{metric}_rank': range(1, len(sorted_features) + 1), f'{metric}_value': sorted_features.values } # DataFrameに変換 metric_df = pd.DataFrame( metric_dict, index=sorted_features.index ) # 結果を辞書に格納 ranked_features[metric] = metric_df # 全ての結果を1つのDataFrameにまとめる result_df = pd.DataFrame(index=self.results.index) for metric in metrics: result_df[f'{metric}_rank'] = ranked_features[metric][f'{metric}_rank'] result_df[f'{metric}_value'] = ranked_features[metric][f'{metric}_value'] # インデックスを特徴量名にする result_df.index.name = 'feature' # 複合スコアでソートして返却 return result_df.sort_values('composite_score_rank')
前回の記事で欠損値を扱えるようにしたというのもあるので、こちらでも欠損値があっても寄与度を算出できるようにしてみましょう
import numpy as np import pandas as pd from scipy import stats from sklearn.metrics import silhouette_samples from typing import Union, Dict, Tuple from dataclasses import dataclass class MissingValueHandler: """欠損値の処理と補正を行うクラス 各指標の計算時に、欠損値の影響を適切に処理し、必要な補正を加えます。 """ @staticmethod def get_valid_data_mask(data: np.ndarray) -> np.ndarray: """有効なデータ(非欠損値)のマスクを取得 Args: data: 評価対象のデータ配列 Returns: np.ndarray: 有効データを示すブールマスク """ return ~np.isnan(data) @staticmethod def calculate_correction_factor(valid_ratio: float, min_ratio: float = 0.5) -> float: """データの有効率に基づく補正係数を計算 Args: valid_ratio: 有効データの割合 (0.0 ~ 1.0) min_ratio: 最小の許容可能な有効データ割合 (デフォルト: 0.5) Returns: float: 補正係数(有効データが少ないほど、スコアを減衰) """ if valid_ratio < min_ratio: return 0.0 return np.sqrt(valid_ratio) # 平方根による補正を適用 @staticmethod def adjust_score(score: float, valid_ratio: float, min_ratio: float = 0.5) -> float: """スコアを有効データ率に基づいて補正 Args: score: 元のスコア valid_ratio: 有効データの割合 min_ratio: 最小の許容可能な有効データ割合 Returns: float: 補正後のスコア """ correction = MissingValueHandler.calculate_correction_factor( valid_ratio, min_ratio) return score * correction class RobustClusterContributionAnalyzer: """欠損値に対応したクラスタリング寄与度分析クラス""" def __init__(self, min_valid_ratio: float = 0.5): """初期化 Args: min_valid_ratio: 最小の許容可能な有効データ割合 (デフォルト: 0.5) """ self.results = None self.feature_names = None self.min_valid_ratio = min_valid_ratio self.valid_ratios = None # 各特徴量の有効データ率を保存 def _preprocess_data(self, X: Union[np.ndarray, pd.DataFrame], labels: Union[np.ndarray, pd.Series]) -> Tuple[np.ndarray, np.ndarray]: """入力データの前処理と欠損値の確認 Args: X: 特徴量行列 labels: クラスターラベル Returns: Tuple[np.ndarray, np.ndarray]: 処理済みの特徴量行列とラベル """ # DataFrameの場合の処理 if isinstance(X, pd.DataFrame): self.feature_names = X.columns.tolist() X_array = X.values else: X_array = np.array(X) self.feature_names = [f'feature_{i}' for i in range(X_array.shape[1])] # ラベルの変換 if isinstance(labels, pd.Series): labels_array = labels.values else: labels_array = np.array(labels) # 各特徴量の有効データ率を計算 self.valid_ratios = np.mean(~np.isnan(X_array), axis=0) return X_array, labels_array def calculate_robust_variance_ratio(self, X: Union[np.ndarray, pd.DataFrame], labels: Union[np.ndarray, pd.Series]) -> np.ndarray: """欠損値に対応した分散比の計算 Args: X: 特徴量行列 labels: クラスターラベル Returns: np.ndarray: 各特徴量の補正済み分散比 """ X_array, labels_array = self._preprocess_data(X, labels) n_features = X_array.shape[1] variance_ratios = np.zeros(n_features) for feature_idx in range(n_features): feature = X_array[:, feature_idx] valid_mask = ~np.isnan(feature) valid_feature = feature[valid_mask] valid_labels = labels_array[valid_mask] if len(valid_feature) == 0: variance_ratios[feature_idx] = 0 continue # 有効なデータのみで分散比を計算 unique_labels = np.unique(valid_labels) cluster_means = np.array([ np.mean(valid_feature[valid_labels == k]) for k in unique_labels ]) overall_mean = np.mean(valid_feature) # クラスター間分散の計算 between_variance = np.sum([ np.sum(valid_labels == k) * (mean - overall_mean) ** 2 for k, mean in enumerate(cluster_means) ]) # クラスター内分散の計算 within_variance = np.sum([ np.sum((valid_feature[valid_labels == k] - cluster_means[i]) ** 2) for i, k in enumerate(unique_labels) ]) # 分散比の計算と補正 if within_variance != 0: ratio = between_variance / within_variance else: ratio = np.inf # 有効データ率による補正 variance_ratios[feature_idx] = MissingValueHandler.adjust_score( ratio, self.valid_ratios[feature_idx], self.min_valid_ratio) return variance_ratios def calculate_robust_f_values(self, X: Union[np.ndarray, pd.DataFrame], labels: Union[np.ndarray, pd.Series]) -> Dict[str, np.ndarray]: """欠損値に対応したF値の計算 Args: X: 特徴量行列 labels: クラスターラベル Returns: Dict: F値関連の統計量(F値、p値、効果量) """ X_array, labels_array = self._preprocess_data(X, labels) n_features = X_array.shape[1] f_values = np.zeros(n_features) p_values = np.zeros(n_features) eta_squared = np.zeros(n_features) for feature_idx in range(n_features): feature = X_array[:, feature_idx] valid_mask = ~np.isnan(feature) valid_feature = feature[valid_mask] valid_labels = labels_array[valid_mask] if len(valid_feature) == 0: continue # 有効なデータのみでF値を計算 unique_labels = np.unique(valid_labels) n_clusters = len(unique_labels) n_samples = len(valid_feature) if n_clusters < 2 or n_samples <= n_clusters: continue # グループ統計量の計算 grand_mean = np.mean(valid_feature) cluster_means = np.array([ np.mean(valid_feature[valid_labels == k]) for k in unique_labels ]) cluster_sizes = np.array([ np.sum(valid_labels == k) for k in unique_labels ]) # 平方和の計算 between_ss = np.sum( cluster_sizes * (cluster_means - grand_mean) ** 2) within_ss = np.sum([ np.sum((valid_feature[valid_labels == k] - cluster_means[i]) ** 2) for i, k in enumerate(unique_labels) ]) # 自由度の計算 between_df = n_clusters - 1 within_df = n_samples - n_clusters # F値の計算 if within_df > 0 and within_ss > 0: f_value = (between_ss / between_df) / (within_ss / within_df) p_value = 1 - stats.f.cdf(f_value, between_df, within_df) effect = between_ss / (between_ss + within_ss) else: f_value = 0 p_value = 1 effect = 0 # 有効データ率による補正 correction = MissingValueHandler.calculate_correction_factor( self.valid_ratios[feature_idx], self.min_valid_ratio) f_values[feature_idx] = f_value * correction p_values[feature_idx] = p_value eta_squared[feature_idx] = effect * correction return { 'f_value': f_values, 'p_value': p_values, 'eta_squared': eta_squared } def calculate_robust_silhouette(self, X: Union[np.ndarray, pd.DataFrame], labels: Union[np.ndarray, pd.Series]) -> np.ndarray: """欠損値に対応したシルエット係数の計算 Args: X: 特徴量行列 labels: クラスターラベル Returns: np.ndarray: 各特徴量の補正済みシルエット係数 """ X_array, labels_array = self._preprocess_data(X, labels) n_features = X_array.shape[1] silhouette_scores = np.zeros(n_features) for feature_idx in range(n_features): feature = X_array[:, feature_idx] valid_mask = ~np.isnan(feature) valid_feature = feature[valid_mask] valid_labels = labels_array[valid_mask] if len(valid_feature) == 0 or len(np.unique(valid_labels)) < 2: continue # 有効なデータのみでシルエット係数を計算 try: score = np.mean(silhouette_samples( valid_feature.reshape(-1, 1), valid_labels )) except: score = 0 # 有効データ率による補正 silhouette_scores[feature_idx] = MissingValueHandler.adjust_score( score, self.valid_ratios[feature_idx], self.min_valid_ratio) return silhouette_scores def analyze_contributions(self, X: Union[np.ndarray, pd.DataFrame], labels: Union[np.ndarray, pd.Series], weights: Dict[str, float] = None) -> pd.DataFrame: """欠損値を考慮した総合的な寄与度分析 Args: X: 特徴量行列 labels: クラスターラベル weights: 各指標の重み付け係数 Returns: pd.DataFrame: 分析結果(各指標のスコアと補正情報を含む) """ if weights is None: weights = { 'variance_ratio': 1/3, 'f_value': 1/3, 'silhouette': 1/3 } # 各指標の計算 variance_ratios = self.calculate_robust_variance_ratio(X, labels) f_results = self.calculate_robust_f_values(X, labels) silhouette_scores = self.calculate_robust_silhouette(X, labels) # 結果をDataFrameにまとめる results_df = pd.DataFrame({ 'variance_ratio': variance_ratios, 'f_value': f_results['f_value'], 'p_value': f_results['p_value'], 'eta_squared': f_results['eta_squared'], 'silhouette': silhouette_scores, 'valid_data_ratio': self.valid_ratios }, index=self.feature_names) # スケーリング関数 def scale_scores(x): return (x - x.min()) / (x.max() - x.min()) if x.max() != x.min() else np.zeros_like(x) # 各指標の正規化 scaled_df = pd.DataFrame({ 'scaled_variance_ratio': scale_scores(results_df['variance_ratio']), 'scaled_f_value': scale_scores(results_df['f_value']), 'scaled_silhouette': scale_scores(results_df['silhouette']) }) # 総合スコアの計算 results_df['composite_score'] = ( weights['variance_ratio'] * scaled_df['scaled_variance_ratio'] + weights['f_value'] * scaled_df['scaled_f_value'] + weights['silhouette'] * scaled_df['scaled_silhouette'] ) self.results = results_df return results_df def get_feature_rankings(self) -> pd.DataFrame: """全特徴量のランキングを返す(欠損値の影響を考慮済み)""" if self.results is None: raise ValueError("先にanalyze_contributionsを実行してください") metrics = ['variance_ratio', 'f_value', 'silhouette', 'composite_score'] ranked_features = {} for metric in metrics: sorted_features = self.results[metric].sort_values(ascending=False) metric_dict = { f'{metric}_rank': range(1, len(sorted_features) + 1), f'{metric}_value': sorted_features.values } ranked_features[metric] = pd.DataFrame( metric_dict, index=sorted_features.index ) result_df = pd.DataFrame(index=self.results.index)
こんな感じです。欠損バージョンを使っておけば、基本的にはどんな時でも寄与度は算出できると思います。
最後に
こんな感じで、クラスタリングにおける寄与度もどきを作ってみるということをやってみましたが、どうだったでしょうか?
クラスタリングにおいて各変数の寄与度が欲しいと思う瞬間ははたして皆さんはあるんですかね?この記事がどなたかの役に立てばうれしい限りです。最後までお読みいただきありがとうございました。