天天看点

对《HYDRA:Massively Compositional Model for Cross-Project Defect Prediction》的复现-GA phase

论文《HYDRA:Massively Compositional Model for Cross-Project Defect Prediction》(以下简称论文),发表于2016的TSE。其模型算法主要分为两个阶段(GA phase & EL phase),本篇主要讲GA phase的复现。

  • GA phase:对源数据集 S=[S1,S2,...,SN] S = [ S 1 , S 2 , . . . , S N ] 以及目标数据集T,训练 N N 个底层分类器(底层分类器均使用Logic Regression)。第ii个底层分类器使用的训练集为 Si∪T^ S i ∪ T ^ ,其中 T^ T ^ 是从 T T 中抽取的部分(10%)带有标签的样本,剩余的90%作为测试集。对于TT中90%的样本,同时作为第 N+1 N + 1 个底层分类器的训练集与测试集,训练得到第 N+1 N + 1 个底层分类器。此时共计得到 N+1 N + 1 个底层分类器,记为 clf_list=[clf1,clf2,...,clfN+1] c l f _ l i s t = [ c l f 1 , c l f 2 , . . . , c l f N + 1 ] 。为了将 clf_list c l f _ l i s t 中的底层分类器集成为GA分类器,需要 N+1 N + 1 个权值以及一个阈值。GA阶段就是通过遗传算法,得到最优的(权值+阈值)基因。

Genetic Algorithm

以下介绍遗传算法,用伪代码的形式展现:

对《HYDRA:Massively Compositional Model for Cross-Project Defect Prediction》的复现-GA phase

以下介绍在选择、交叉、变异阶段用到的算法

  1. 轮盘赌选择(Roulette wheel selection)

      轮盘赌选择是最常用的选择算子,来源于实际生活中的掷飞镖游戏:拥有多个扇形区域的轮盘,每个区域占有一定的面积,游戏者随机投掷一枚飞镖,该飞镖飞中的区域,即为被选中的区域。如果要将上述游戏变成代码,首先需要得到拥有各个扇形区域的圆盘。在GA中,也即求出每个个体能够生存下去的概率。在遗传算法中,每个个体会有一个适宜度评分 fitness f i t n e s s (论文中用 f1 f 1 作为 fitness f i t n e s s ),个体 i i 的存活率pi=fitnessi∑Mj=1fitnessjpi=fitnessi∑j=1Mfitnessj。对于个体数为 M M 的种群,求出所有个体的存活率还不好使,因为无法通过这个概率列表进行“投掷”飞镖。为此,可以将轮盘沿着某个扇形的边剪开,得到一条线段,此时所有个体的存活率便体现在这条[0,1][0,1]的线段上。为了更好的确定投掷区间,此时需要计算每个个体的累积概率, qi=∑ij=1pj q i = ∑ j = 1 i p j 。至此,就将个体数为 M M 的种群投影到了[0,1][0,1]的线段上,随机一个 [0,1] [ 0 , 1 ] 的数字,便能在这条线段上选择一个个体。

  2. 单点交叉(Single point crossover)

      单点交叉算子是模拟生物界染色体上基因交叉,可以说是一种仿生技术。本文采取单点交叉,也即对于要配对的基因对(基因与个体一对一的关系,一个个体只含有一条基因),随机选择一个基因序号,从此序号之后的基因与搭档交换,得到一对新的个体。

  3. 随机变异(Random mutation)

      与单点交叉类似,对于一个个体,随机选择一个基因序号,重新给该序号上的基因赋上一随机值(取值范围之内),得到一个新的个体。

GA phase的实现过程

在介绍了GA算法之后,如何将其用于软件缺陷预测呢?前面我们已经得到了 clf_list c l f _ l i s t ,现在我们的目标是如何给这些底层分类器分配权值,以及这 N+1 N + 1 个分类器通过何种方式,对于某一样例进行预测。

我们不妨假设,现在已经拥有 N+1 N + 1 个权值,该权值列表记为 wight_list w i g h t _ l i s t ,下面给出判断样例 j j 是正例还是反例的计算公式:

Label(j)=⎧⎩⎨1(i.e.,defective),0(i.e.,defectfree),if Comp(j) >= thresholdotherwiseLabel(j)={1(i.e.,defective),if Comp(j) >= threshold0(i.e.,defectfree),otherwise

其中,

Comp(j)=∑N+1i=1αi∗Scorei(j)LOC(j) C o m p ( j ) = ∑ i = 1 N + 1 α i ∗ S c o r e i ( j ) L O C ( j )

从公式中可以看到,判断一个样例是正例还是反例,会先通过底层分类器进行判断,然后加权累加,得到的 Comp C o m p 与阈值进行比较。由此可知,阈值的取值范围不再是 [0,1] [ 0 , 1 ] ,而是 [0,N+1] [ 0 , N + 1 ] 。

此时我们便能确定基因的序列了。论文中的基因长度为 N+2 N + 2 ,其中,前 N+1 N + 1 项的取值范围为 [0,1] [ 0 , 1 ] ,第 N+2 N + 2 项的取值范围为 [0,N+1] [ 0 , N + 1 ] 。

确定了基因序列,便能用GA算法生成最优的权值与阈值了。详细代码,见 github

此处贴出GA phase 主要代码

# Genetic Algorithm
from sklearn.linear_model import LogisticRegression
import random
import numpy as np
from Tools.EvaluationTool import EvaluationTool


class GA:
def __init__(self, pop_size, max_gen=100, crossover_rate=0.6, mutation_rate=0.1):
    # 初始化
    self.pop_size = pop_size  # 种群数
    self.max_gen = max_gen  # 最大繁殖代数
    self.crossover_rate = crossover_rate  # 交叉率
    self.mutation_rate = mutation_rate  # 变异率
    self.genes_num = None  # 基因数目
    self.clf_list = None
    self.target_data = None
    self.target_label = None
    self.pop_f1_list = []  # 记录种群中每个个体的f1值,避免重复计算

def fit(self, source_data_list, source_label_list, target_data, target_label):
    # 得到一众基础分类器
    clf_list = self.get_base_clf(source_data_list, source_label_list, target_data, target_label)
    self.clf_list = clf_list
    # 个体->染色体->list of genes
    # 确定基因的个数 = 分类器个数 + 阈值
    genes_num = len(clf_list) + 1
    self.genes_num = genes_num
    # 初始化种群
    pop = np.random.random((self.pop_size, self.genes_num))
    # 最后一列阈值的取值范围由(0,1)调整到(0,genes_num-1)
    pop[:, -1] = pop[:, -1] * (genes_num - 1)
    # 计算初始群体中的最优解
    pre_solution, pre_f1 = self.get_best_from_pop(pop)
    # 繁殖
    cur_gen = 0  # 当前代数为0
    while cur_gen < self.max_gen:
        temp_pop = pop.copy()
        pop_new = self.ga_generation(temp_pop)
        cur_solution, cur_f1 = self.get_best_from_pop(pop_new)
        if cur_f1 > pre_f1:
            pre_f1 = cur_f1
            pre_solution = cur_solution
        cur_gen += 1
    print(pre_f1)
    return pre_solution, pre_f1

# 训练得到一众基础分类器
def get_base_clf(self, source_data_list, source_label_list, target_data, target_label):
    # 将target中一部分样例添加到source中,当做训练集
    sdl, sll, td, tl = self.transfer_target_to_source(source_data_list, source_label_list,
                                                      target_data, target_label)
    self.target_data, self.target_label = td, tl
    clf_list = []
    for index in range(len(source_data_list)):
        # 论文中用的是逻辑回归,感觉效果不是很好
        clf = LogisticRegression()
        # 换成C4.5
        # clf = tree.DecisionTreeClassifier(criterion='entropy', splitter='random', max_depth=30)
        clf.fit(sdl[index], sll[index])
        clf_list.append(clf)
    # 使用target训练,得到(N+1)th clf
    clf = LogisticRegression()
    clf.fit(td, tl)
    clf_list.append(clf)
    return clf_list

# 从target中转移部分样例到source中
# 20180806 将for循环改成列表推导式
@staticmethod
def transfer_target_to_source(source_data_list, source_label_list, target_data, target_label):
    # 随机移除10%的样例
    t_sample_num = target_data.shape[0]
    t_remove_num = round(t_sample_num * 0.1)  # 四舍五入取整
    t_remove_index = random.sample(range(0, t_sample_num), t_remove_num)
    t_other_index = [i for i in range(0, t_sample_num) if i not in t_remove_index]
    # 新的target
    target_data_new = np.array([target_data[i, :] for i in t_other_index])
    target_label_new = np.array([target_label[i] for i in t_other_index])
    # 移出的样例以及标签
    temp_data = np.array([target_data[i, :] for i in t_remove_index])
    temp_label = np.array([target_label[i] for i in t_remove_index])
    # 新的source
    temp_data_list = [np.append(source_data_list[i], temp_data, axis=0) for i in range(len(source_data_list))]
    temp_label_list = [np.append(source_label_list[i], temp_label, axis=0) for i in range(len(source_label_list))]
    return temp_data_list, temp_label_list, target_data_new, target_label_new

# 从种群中选出最优的个体(染色体)
def get_best_from_pop(self, pop):
    if len(self.pop_f1_list) != 0:
        self.pop_f1_list.clear()
    # 比较每一个个体的F1值
    pre_f1, best_index = 0, 0
    for index in range(pop.shape[0]):
        cur_f1 = self.score(pop[index])
        self.pop_f1_list.append(cur_f1)
        if cur_f1 > pre_f1:
            pre_f1 = cur_f1
            best_index = index
    return pop[best_index], pre_f1

# 预测目标数据集的标签并计算分数
def score(self, pop_item):
    # 计算target中每一个样例的得分
    predict_label = []
    for sample in range(self.target_data.shape[0]):
        # comp = from i to N+1 (wight * clf prediction) / loc(sample)
        comp = 0
        for i in range(len(self.clf_list)):
            comp += pop_item[i] * self.clf_list[i].predict(self.target_data[sample].reshape(1, -1))[0]
        # comp /= self.target_data[sample, 0]
        if comp >= pop_item[-1]:
            predict_label.append(1)
        else:
            predict_label.append(0)
    score = EvaluationTool.cal_f1(np.array(predict_label), self.target_label)
    return score

# 繁殖迭代
def ga_generation(self, pop):
    # 选择阶段
    pop_select = self.select(pop)
    # 交叉阶段
    pop_cross = self.crossover(pop_select)
    # 变异阶段
    pop_mutation = self.mutation(pop_cross)
    return np.array(pop_mutation)

# 选择
# 20180807 只修改到这里 下次从这里开始
def select(self, pop):
    fit_list, q_list = [], []  # 适应度列表,累积概率列表
    choose_index = set()  # 被选中的个体
    fit_sum = sum(self.pop_f1_list)   # 适应度总和
    p_list_array = np.divide(np.array(self.pop_f1_list), fit_sum)  # 遗传到下一代概率列表
    for i in range(len(p_list_array)):
        # 计算每个个体的累积概率
        q = 0
        for j in range(i+1):
            q += p_list_array[j]
        q_list.append(q)
    # 产生一个随机数列表,用于选择
    rand_list = np.random.rand(pop.shape[0])
    for i in range(len(rand_list)):
        choose_index.add(self.get_index_from_list(rand_list[i], q_list))
    pop_new = [pop[i] for i in choose_index]
    return np.array(pop_new)

# 从目标列表中,返回传入参数所对应的位置,传入参数应位于两个值之间
@staticmethod
def get_index_from_list(num, target_list):
    for i in range(len(target_list)):
        if i == 0 and num <= target_list[0]:
            return 0
        else:
            if target_list[i-1] < num <= target_list[i]:
                return i

# 交叉
def crossover(self, pop):
    son_list = []
    pair_num = int(pop.shape[0]/2)
    for i in range(pair_num):
        rand_num = random.random()
        if rand_num < self.crossover_rate:
            # 随机选取交叉位置
            rand_cross_index = random.randint(0, pop.shape[1]-1)  # ???是否减一
            # 交叉并产生新子代
            parent_a = pop.copy()[i*2, :]
            parent_b = pop.copy()[i*2+1, :]
            temp_parent_a = parent_a.copy()[rand_cross_index:]
            parent_a[rand_cross_index:] = parent_b[rand_cross_index:]  # 新子代a
            parent_b[rand_cross_index:] = temp_parent_a  # 新子代b
            son_list.append(parent_a)
            son_list.append(parent_b)
    if len(son_list) != 0:
        pop_new = np.append(pop, np.array(son_list), axis=0)
    else:
        pop_new = pop
    return pop_new

# 变异
def mutation(self, pop):
    son_list = []
    for i in range(pop.shape[0]):
        rand_num = random.random()
        if rand_num < self.mutation_rate:
            # 随机产生变异位置
            rand_mutation_index = random.randint(0, pop.shape[1]-1)  # ???是否减一
            # 变异产生新子代
            parent = pop.copy()[i, :]
            if rand_mutation_index == pop.shape[1]-1:
                # 最后一列变异
                r = random.random()*(self.genes_num-1)
                parent[rand_mutation_index] = r
            else:
                # 其他列变异
                r = random.random()
                parent[rand_mutation_index] = r
            son_list.append(parent)
    if len(son_list) != 0:
        pop_new = np.append(pop, np.array(son_list), axis=0)
    else:
        pop_new = pop
    return pop_new
           

继续阅读