ID3 算法, 即 Iterative Dichotomiser 3(迭代二分器第三代)。算法通过迭代地选择具有最高信息增益的特征来分割数据集,从而递归地生成决策树模型,直至数据集被完美分类或无可分割的特征为止。
ID3 构建决策树的流程:
- 找到最优特征(信息增益值最高)。
- 最优特征,特征值作为有向边,判断子数据集
- 全部是同一类:输出叶子节点。
- 多类:
- 如果子数据集中还有特征,那么再次找寻最优特征。
- 如果子数据集中只剩下最后的分类结果了,那么输出最多的一类作为叶子节点。
- 直到叶子节点全部输出,或者无数据可遍历。
ID3 算法的特点:
- 只能处理离散型数据的学习计算。不能处理连续型数据。
- 容易产生过拟合现象(overfitting),泛化能力弱。
使用到的公式
- 信息熵(Entropy) H ( X ) = − ∑ i = 1 n p i log 2 p i H(X)=-\sum\limits_{i=1}^{n} p_i\log_{2}{p_i} H(X)=−i=1∑npilog2pi 其中, X X X表示样本, p i p_i pi表示概率,并且 ∑ i = 1 n p i = 1 \sum\limits_{i=1}^{n}p_i=1 i=1∑npi=1。例如投掷硬币,正面朝上的概率为1/2,反面朝上的概率为1/2,那么 H ( X ) = − ( 1 2 log 2 1 2 + 1 2 log 2 1 2 ) = 1 H(X)=-(\frac{1}{2}\log_2\frac{1}{2}+\frac{1}{2}\log_2 \frac{1}{2})=1 H(X)=−(21log221+21log221)=1。
- 条件熵(Condition Entropy) H ( Y ∣ X ) = ∑ p i H ( Y ∣ X = x i ) H(Y|X)=\sum p_iH(Y|X=x_i) H(Y∣X)=∑piH(Y∣X=xi) 表示在 X 发生的前提下,Y 的信息熵。
- 信息增益(Information Gain) G ( D ∣ A ) = H ( D ) − H ( D ∣ A ) G(D|A)=H(D)-H(D|A) G(D∣A)=H(D)−H(D∣A) 表示 A 发生时,的信息增益。
下面将按照如下步骤实现一个简单的 ID3 决策树:
- 准备训练数据。
- 从可选的特征中选出一个最佳特征作树的节点。
- 先计算数据中的初始熵。
- 然后计算每个特征下的信息熵。
- 计算各个特征的熵增,熵增最大的将作为树的节点。
- 根据选择出的特征节点,对数据进行分组,对于每个分组,计算其信息熵,选择出信息熵最小的分组值作为决策特征值,然后获取该特征值对应的y值。
- 将非最小熵值的其他分组作为树的子节点,重复 2 - 3 步骤,继续构建决策树。
实例代码如下:
import numpy as np import pandas as pd from pandas import DataFrame # 计算 D 的信息熵 def cal_h(p_arr): return sum(-p * np.log2(p) for p in p_arr) # 计算分组的信息熵 def cal_grouped_h(groups_p): return dict((k, cal_h(p_arr)) for k, p_arr in groups_p.items()) def cal_c_h(p_arr_info): """计算条件熵 :param p_arr_info:输入的概率信息,字典类型,对应的值用列表表示,列表的第一个元素表示该条件在整体样本中的概率,从第二个元素开始,表示该条件下各个分量的概率 :return: 条件熵的总和 """ return np.sum( [p_arr[0] * cal_h(p_arr[1:]) for p_arr in p_arr_info.values()]) # 根据特征值进行分组,对每个分组中 y_label 的概率进行统计 def stat_y_label_percentage(df: DataFrame, feature, y_label, is_to_dict=False): condition_stat = df.groupby(feature)[y_label].value_counts( normalize=True).to_frame(name='percentage').reset_index() if is_to_dict: return condition_stat.groupby(feature)['percentage'].apply( list).to_dict() return condition_stat # 计算各个特征的条件信息熵 def cal_features_c_h(df, features, y_label): result = {} for feature in features: # 根据特征值分组,对每个分组中 y_label 的概率进行统计 grouped_p_items = stat_y_label_percentage(df, feature, y_label, is_to_dict=True) print(f"'{feature}'的概率分布:{grouped_p_items}") # 计算各个分组占总体样本的概率 grouped_p = df[feature].value_counts(normalize=True).to_dict() # 将分组概率插入到各组概率数组的第一项中 [grouped_p_items[k].insert(0, p) for k, p in grouped_p.items()] # 计算特征下的条件熵 c_h = cal_c_h(grouped_p_items) print(f"'{feature}'的条件熵为:{c_h}") result[feature] = c_h return result # 计算熵增 def cal_gain_entropy(pre_entropy, cur_entropy: dict): return dict((k, pre_entropy - v) for k, v in cur_entropy.items()) # 熵增最大的特征,作为节点 def select_max_gain_entropy(entropy_gain_info): return max(entropy_gain_info.items(), key=lambda x: x[1])[0] # 定义决策树 class DecisionTree: y_label = None def __init__(self, df, features, y_label): self.y_label_val = None self.decision_condition_val = None self.other_y_label_val = None self.df = df self.features = features self.children = [] self.y_label = y_label def select_decision_node(self): # 计算初始信息熵 percentages = self.df[self.y_label].value_counts(normalize=True) h_D = cal_h(percentages) # 计算信息熵 info_h = cal_features_c_h(self.df, self.features, self.y_label) # 计算熵增 gain_entropy = cal_gain_entropy(h_D, info_h) print(gain_entropy) # 选择熵增最大的节点作为分割点 select_node = select_max_gain_entropy(gain_entropy) print(select_node) # 构造树的节点 self.name = select_node # 计算决策值,即获取决策值,和对应的训练值 def cal_decision_val(self): grouped_p = stat_y_label_percentage(self.df, self.name, self.y_label, True) grouped_h = cal_grouped_h(grouped_p) # 熵值最小的值作为叶子的判断条件 self.decision_condition_val = min(grouped_h.items(), key=lambda x: x[1])[0] # 决策条件下的概率数据,即 yes 和 no 的占比 decision_condition_data = self.df[self.df[self.name] == self.decision_condition_val] decision_condition_data_p = stat_y_label_percentage( decision_condition_data, self.name, self.y_label) # 概率大的作为预测值 self.y_label_val = self.df.loc[ decision_condition_data_p["percentage"].idxmax(), self.y_label] # 临时变量,当没有子节点时,预测树的另一个分支值 self.temp_other_y_label_val = "no" if self.y_label_val == 'yes' else "yes" def create_children(self): # 树的分支,需要排除已经决策的点 children_nodes = [val for val in self.df[self.name].unique() if val != self.decision_condition_val] self.features.remove(self.name) if self.features: for node in children_nodes: child_df = self.df[self.df[self.name] == node].reset_index( drop=True) child = DecisionTree(child_df, self.features, self.y_label) child.fit() self.children.append(child) # 当特征值为空时,停止添加子树 if not self.features: break # 当无特征时,直接决策 else: self.other_y_label_val = self.temp_other_y_label_val # 训练决策树 def fit(self): # 选择决策点 self.select_decision_node() # 计算当前节点的决策 self.cal_decision_val() # 递归构建树 self.create_children() # 决策 def decide(self, row): if row[self.name] == self.decision_condition_val: row[self.y_label] = self.y_label_val else: if not self.children: row[self.y_label] = self.other_y_label_val else: for child in self.children: child.decide(row) if __name__ == '__main__': df = pd.DataFrame({"outlook": ["overcast", "overcast", "overcast", "overcast", "rainy", "rainy", "rainy", "rainy", "rainy", "sunny", "sunny", "sunny", "sunny", "sunny"], "temperature": ["hot", "cool", "mild", "hot", "mild", "cool", "cool", "mild", "mild", "hot", "hot", "mild", "cool", "mild"], "humidity": ["high", "normal", "high", "normal", "high", "normal", "normal", "normal", "high", "high", "high", "high", "normal", "normal"], "windy": [ "FALSE", "TRUE", "TRUE", "FALSE", "FALSE", "FALSE", "TRUE", "FALSE", "TRUE", "FALSE", "TRUE", "FALSE", "FALSE", "TRUE"], "play": ["yes", "yes", "yes", "yes", "yes", "yes", "no", "yes", "no", "no", "no", "no", "yes", "yes"]}) tree = DecisionTree(df, ["outlook", "temperature", "humidity", "windy"], "play") # 训练 tree.fit() test_df = df = pd.DataFrame({"outlook": ["rainy", "sunny"], "temperature": ["hot", "cool"], "humidity": ["high", "high"], "windy": ["TRUE", "TRUE"], "play": ["", ""]}) # 决策 for _, row in df.iterrows(): tree.decide(row) print(test_df)
上述代码是一个简单的 ID3 决策树实现,你可以根据实际情况,对测试数据进行适当的修改,然后使用决策树进行训练和预测。
还没有评论,来说两句吧...