天天看点

H2o-spark

1.满足下面条件:

  • Linux/OS X/Windows
  • Java 8+
  • Python 2.7+ For Python version of Sparkling Water (PySparkling)
  • Spark 2.3 and 

    SPARK_HOME

     shell variable must point to your local Spark installation

2.安装对应版本的h2o-sparkling 

2.3.0为对应已安装的spark版本,保持版本一致,以防报错。

pip install h2o_pysparkling_2.3==2.3.0

3.完整代码:

from pysparkling import *

from h2o.grid.grid_search import H2OGridSearch

from h2o.estimators.gbm import H2OGradientBoostingEstimator

#外部模式:不依赖spark集群,不依赖spark集群资源计算,必须关闭spark.dynamicAllocation.enabled=false conf = H2OConf(ss).set_external_cluster_mode().set_user_name('jerry').use_auto_cluster_start().set_num_of_external_h2o_nodes(1).set('HADOOP_USER_NAME','jerry').set_h2o_cluster("10.111.23.70",54321).set_client_ip("10.111.23.70").set_h2o_driver_path("/home/dp/h2odriver/h2odriver-sw2.3.18-hdp2.6-extended.jar").set_mapper_xmx("2G").set_yarn_queue("default").set_cloud_name("h2o_gbm") 

#内部模式:依赖spark集群,h2o集群将作业分发在spark集群中,依赖spark集群资源计算

#二者任选其一

#保持默认设置即可

conf = H2OConf(ss)

conf.set_num_h2o_workers(2)

创建H2OContext

hc = H2OContext.getOrCreate(ss, conf)

if df_count \

        and df_count > 30000:

    percent = 30000 / df_count

    # df = df.randomSplit(weights=[1-percent,percent],seed=1234)[1]

    df = df.sample(fraction=percent, seed=1234)

h2o_df = hc.as_h2o_frame(df, framename="df_h2o")

y_lable_h2o_df = h2o_df[self.y_col]

rest_h2o_df = h2o_df.drop(self.y_col)

# 获取后字符列集合,将列值转换为枚举

h2o_rest_allcols_no_label = rest_h2o_df.columns

# --------------------------对字符列进行编码----------------------

# 获取字符列h2o DataFame

str_h2o_df = None

# 获取字符列名集合

str_list = list()

if len(h2o_df.types) > 0:

    # 获取字符列名集合

    str_list = [k for k, v in rest_h2o_df.types.items() if v == 'string']

    if len(str_list) > 0:

        # 如果字符列集合不为空,则将字符列h2o DataFame的每列值进行枚举化

        str_h2o_df = rest_h2o_df[str_list].asfactor()

# 将非空的字符列集合h2o DataFame 与剩余的数值列后h2o DataFrame,进行拼接

if str_h2o_df:

    # 找到剩余数值列集合

    rest_list = [i for i in h2o_rest_allcols_no_label if i not in str_list]

    # 如数值列集合存在,与(编码后的)字符列集合合并

    if len(rest_list) > 0:

        rest_h2o_df = rest_h2o_df[rest_list].concat(str_h2o_df)

    # 否则 将字符列集合赋值给剩余(不带GEO_Y) h2o_df:此时只有spark df只有字符列变量

    else:

        rest_h2o_df = str_h2o_df

# 否则 将只有数值列集合赋值给剩余(不带GEO_Y) h2o_df:此时只有spark df只有数值列

# 不用再赋值,因为删除了GEO_Y后,全部是无需编码的数值列特征

# --------------------------对字符列进行编码----------------------

# 需要将标签值列 转换成 枚举类型(离散特征)

y_lable_h2o_df = y_lable_h2o_df.asfactor()

h2o_df = y_lable_h2o_df.concat(rest_h2o_df)

predictors = h2o_df.names[:]

# ratios = [0.6, 0.2]

frs = h2o_df.split_frame(ratios=[.7], seed=12345)

train = frs[0]

train.frame_id = "Train"

valid = frs[1]

valid.frame_id = "Validation"

if self.y_col \

        and self.y_col in predictors:

    predictors.remove(self.y_col)

# ======================modified =========================

# 'learn_rate': 0.1, 'max_depth': 15, 'ntrees': 100, 'stopping_metric': 'AUTO'}

# gridsearch  交叉验证参数组合

gbm_param_grid = {

    'learn_rate': 0.06,  # default = 0.1,

    'max_depth': [5, 9],

    # 'max_depth': [7],

    'stopping_metric': 'logloss',

}

gbm_grid1 = H2OGridSearch(model=H2OGradientBoostingEstimator,

                          hyper_params=gbm_param_grid

                          )

gbm_grid1.train(x=predictors, y=self.y_col, training_frame=train, validation_frame=valid, ntrees=100, seed=1)

model_rf = gbm_grid1.get_grid(sort_by='auc', decreasing=True)

# print('tmpaaa')

best_rf1 = model_rf.models[0]

# print(best_rf1)

# print('aaa')

best_rf1_perg1 = best_rf1.model_performance(valid)

print('================= 校验集表现指标 =================', best_rf1_perg1)

print('================= 最优参数组合 ================= ', model_rf.get_hyperparams_dict(model_rf.model_ids[0]))

# print(best_rf1.varimp())

importance_cols = dict()

for i in best_rf1.varimp():

    # ======================modified =========================

    # importance_cols.setdefault(i[0],i[3])

    # value = i[3]

    # if 'e' in str(i[3]):

    #     arr = str(i[3]).split('e')

    #     value = float(arr[0]) * math.pow(math.e, int(arr[1]))

    importance_cols.setdefault(i[0], i[3])

if len(importance_cols) > 0:

    no_nan_len = 0

    for k, v in importance_cols.items():

        if v > 0:

            no_nan_len = no_nan_len + 1

    cumulative_importance_threshold = 0

    cumulative_importance_list = list()

    for k, v in sorted(importance_cols.items(), key=lambda x: x[1], reverse=True):

        if cumulative_importance_threshold <= no_nan_len * threshold:

            cumulative_importance_list.append(k)

            cumulative_importance_threshold = cumulative_importance_threshold + 1

    self.low_importance_list = cumulative_importance_list

    # print('============重要度========',no_nan_len,'===== ',len(cumulative_importance_list))

initalImportanceList = importance_cols

return initalImportanceList