天天看點

用 Mars Remote API 輕松分布式執行 Python 函數

Mars 是一個并行和分布式 Python 架構,能輕松把單機大家耳熟能詳的的 numpy、pandas、scikit-learn 等庫,以及 Python 函數利用多核或者多機加速。這其中,并行和分布式 Python 函數主要利用 Mars Remote API。

啟動 Mars 分布式環境可以參考:

  1. 指令行方式在叢集中部署
  2. Kubernetes 中部署
  3. MaxCompute 開箱即用的環境 ,購買了 MaxCompute 服務的可以直接使用。

如何使用 Mars Remote API

使用 Mars Remote API 非常簡單,隻需要對原有的代碼做少許改動,就可以分布式執行。

拿用蒙特卡洛方法計算 π 為例。代碼如下,我們編寫了兩個函數,

calc_chunk

用來計算每個分片内落在圓内的點的個數,

calc_pi

用來把多個分片

calc_chunk

計算的結果彙總最後得出 π 值。

from typing import List
import numpy as np

def calc_chunk(n: int, i: int):
    # 計算n個随機點(x和y軸落在-1到1之間)到原點距離小于1的點的個數
    rs = np.random.RandomState(i)
    a = rs.uniform(-1, 1, size=(n, 2))
    d = np.linalg.norm(a, axis=1)
    return (d < 1).sum()

def calc_pi(fs: List[int], N: int):
    # 将若幹次 calc_chunk 計算的結果彙總,計算 pi 的值
    return sum(fs) * 4 / N

N = 200_000_000
n = 10_000_000

fs = [calc_chunk(n, i)
      for i in range(N // n)]
pi = calc_pi(fs, N)
print(pi)           

%%time

下可以看到結果:

3.1416312
CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s
Wall time: 12.3 s           

在單機需要 12.3 s。

要讓這個計算使用 Mars Remote API 并行起來,我們不需要對函數做任何改動,需要變動的僅僅是最後部分。

import mars.remote as mr

# 函數調用改成 mars.remote.spawn
fs = [mr.spawn(calc_chunk, args=(n, i))
      for i in range(N // n)]
# 把 spawn 的清單傳入作為參數,再 spawn 新的函數
pi = mr.spawn(calc_pi, args=(fs, N))
# 通過 execute() 觸發執行,fetch() 擷取結果
print(pi.execute().fetch())           

%%time

下看到結果:

3.1416312
CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms
Wall time: 2.85 s           

結果一模一樣,但是卻有數倍的性能提升。

可以看到,對已有的 Python 代碼,Mars remote API 幾乎不需要做多少改動,就能有效并行和分布式來加速執行過程。

一個例子

為了讓讀者了解 Mars Remote API 的作用,我們從另一個例子開始。現在我們有一個資料集,我們希望對它們做一個分類任務。要做分類,我們有很多算法和庫可以選擇,這裡我們用 RandomForest、LogisticRegression,以及 XGBoost。

困難的地方是,除了有多個模型選擇,這些模型也會包含多個超參,那哪個超參效果最好呢?對于調參不那麼有經驗的同學,跑過了才知道。是以,我們希望能生成一堆可選的超參,然後把他們都跑一遍,看看效果。

準備資料

這個例子裡我們使用

otto 資料集

首先,我們準備資料。讀取資料後,我們按 2:1 的比例把資料分成訓練集和測試集。

import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

def gen_data():
    df = pd.read_csv('otto/train.csv')
    
    X = df.drop(['target', 'id'], axis=1)
    y = df['target']
    
    label_encoder = LabelEncoder()
    label_encoder.fit(y)
    y = label_encoder.transform(y)
    
    return train_test_split(X, y, test_size=0.33, random_state=123)

X_train, X_test, y_train, y_test = gen_data()           

模型

接着,我們使用 scikit-learn 的 RandomForest 和 LogisticRegression 來處理分類。

RandomForest:

from sklearn.ensemble import RandomForestClassifier

def random_forest(X_train: pd.DataFrame, 
                  y_train: pd.Series, 
                  verbose: bool = False,
                  **kw):
    model = RandomForestClassifier(verbose=verbose, **kw)
    model.fit(X_train, y_train)
    return model           

接着,我們生成供 RandomForest 使用的超參,我們用 yield 的方式來疊代傳回。

def gen_random_forest_parameters():
    for n_estimators in [50, 100, 600]:
        for max_depth in [None, 3, 15]:
            for criterion in ['gini', 'entropy']:
                yield {
                    'n_estimators': n_estimators,
                    'max_depth': max_depth,
                    'criterion': criterion
                }           

LogisticRegression 也是這個過程。我們先定義模型。

from sklearn.linear_model import LogisticRegression

def logistic_regression(X_train: pd.DataFrame,
                        y_train: pd.Series,
                        verbose: bool = False,
                        **kw):
    model = LogisticRegression(verbose=verbose, **kw)
    model.fit(X_train, y_train)
    return model           

接着生成供 LogisticRegression 使用的超參。

def gen_lr_parameters():
    for penalty in ['l2', 'none']:
        for tol in [0.1, 0.01, 1e-4]:
            yield {
                'penalty': penalty,
                'tol': tol
            }           

XGBoost 也是一樣,我們用

XGBClassifier

來執行分類任務。

from xgboost import XGBClassifier

def xgb(X_train: pd.DataFrame,
        y_train: pd.Series,
        verbose: bool = False,
        **kw):
    model = XGBClassifier(verbosity=int(verbose), **kw)
    model.fit(X_train, y_train)
    return model           

生成一系列超參。

def gen_xgb_parameters():
    for n_estimators in [100, 600]:
        for criterion in ['gini', 'entropy']:
            for learning_rate in [0.001, 0.1, 0.5]:
                yield {
                    'n_estimators': n_estimators,
                    'criterion': criterion,
                    'learning_rate': learning_rate
                }           

驗證

接着我們編寫驗證邏輯,這裡我們使用

log_loss

來作為評價函數。

from sklearn.metrics import log_loss

def metric_model(model, 
                 X_test: pd.DataFrame,
                 y_test: pd.Series) -> float:
    if isinstance(model, bytes):
        model = pickle.loads(model)
    y_pred = model.predict_proba(X_test)
    return log_loss(y_test, y_pred)


def train_and_metric(train_func,
                     train_params: dict,
                     X_train: pd.DataFrame, 
                     y_train: pd.Series, 
                     X_test: pd.DataFrame, 
                     y_test: pd.Series,
                     verbose: bool = False
                     ):
    # 把訓練和驗證封裝到一起
    model = train_func(X_train, y_train, verbose=verbose, **train_params)
    metric = metric_model(model, X_test, y_test)
    return model, metric           

找出最好的模型

做好準備工作後,我們就開始來跑模型了。針對每個模型,我們把每次生成的超參們送進去訓練,除了這些超參,我們還把

n_jobs

設成 -1,這樣能更好利用單機的多核。

results = []

# -------------
# Random Forest
# -------------

for params in gen_random_forest_parameters():
    print(f'calculating on {params}')
    # fixed random_state
    params['random_state'] = 123
    # use all CPU cores
    params['n_jobs'] = -1
    model, metric = train_and_metric(random_forest, params,
                                     X_train, y_train,
                                     X_test, y_test)
    print(f'metric: {metric}')
    results.append({'model': model, 
                    'metric': metric})
    
# -------------------
# Logistic Regression
# -------------------

for params in gen_lr_parameters():
    print(f'calculating on {params}')
    # fixed random_state
    params['random_state'] = 123
    # use all CPU cores
    params['n_jobs'] = -1
    model, metric = train_and_metric(logistic_regression, params,
                                     X_train, y_train,
                                     X_test, y_test)
    print(f'metric: {metric}')
    results.append({'model': model, 
                    'metric': metric})
    
# -------
# XGBoost
# -------
    
for params in gen_xgb_parameters():
    print(f'calculating on {params}')
    # fixed random_state
    params['random_state'] = 123
    # use all CPU cores
    params['n_jobs'] = -1
    model, metric = train_and_metric(xgb, params,
                                     X_train, y_train,
                                     X_test, y_test)
    print(f'metric: {metric}')
    results.append({'model': model, 
                    'metric': metric})           

運作一下,需要相當長時間,我們省略掉一部分輸出内容。

calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'gini'}
metric: 0.6964123781828575
calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'}
metric: 0.6912312790832288
# 省略其他模型的輸出結果
CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s
Wall time: 31min 44s           

從 CPU 時間和 Wall 時間,能看出來這些訓練還是充分利用了多核的性能。但整個過程還是花費了 31 分鐘。

使用 Remote API 分布式加速

現在我們嘗試使用 Remote API 通過分布式方式加速整個過程。

叢集方面,我們使用最開始說的第三種方式,直接在 MaxCompute 上拉起一個叢集。大家可以選擇其他方式,效果是一樣的。

n_cores = 8
mem = 2 * n_cores  # 16G
# o 是 MaxCompute 入口,這裡建立 10 個 worker 的叢集,每個 worker 8核16G
cluster = o.create_mars_cluster(10, n_cores, mem, image='extended')           

為了友善在分布式讀取資料,我們對資料處理稍作改動,把資料上傳到 MaxCompute 資源。對于其他環境,使用者可以考慮 HDFS、Aliyun OSS 或者 Amazon S3 等存儲。

if not o.exist_resource('otto_train.csv'):
    with open('otto/train.csv') as f:
        # 上傳資源
        o.create_resource('otto_train.csv', 'file', fileobj=f)
        
def gen_data():
    # 改成從資源讀取
    df = pd.read_csv(o.open_resource('otto_train.csv'))
    
    X = df.drop(['target', 'id'], axis=1)
    y = df['target']
    
    label_encoder = LabelEncoder()
    label_encoder.fit(y)
    y = label_encoder.transform(y)
    
    return train_test_split(X, y, test_size=0.33, random_state=123)           

稍作改動之後,我們使用

mars.remote.spawn

方法來讓

gen_data

排程到叢集上運作。

import mars.remote as mr

# n_output 說明是 4 輸出
# execute() 執行後,資料會讀取到 Mars 叢集内部
data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute()
# remote_ 開頭的都是 Mars 對象,這時候資料在叢集内,這些對象隻是引用
remote_X_train, remote_X_test, remote_y_train, remote_y_test = data           

目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等,還不能序列化模型,是以,我們要對

train_and_metric

稍作改動,把模型 pickle 了之後再傳回。

def distributed_train_and_metric(train_func,
                                 train_params: dict,
                                 X_train: pd.DataFrame, 
                                 y_train: pd.Series, 
                                 X_test: pd.DataFrame, 
                                 y_test: pd.Series,
                                 verbose: bool = False
                                 ):
    model, metric = train_and_metric(train_func, train_params,
                                     X_train, y_train, 
                                     X_test, y_test, verbose=verbose)
    return pickle.dumps(model), metric           

後續 Mars 支援了序列化模型後可以直接 spawn 原本的函數。

接着我們就對前面的執行過程稍作改動,把函數調用全部都用

mars.remote.spawn

來改寫。

import numpy as np

tasks = []
models = []
metrics = []

# -------------
# Random Forest
# -------------

for params in gen_random_forest_parameters():
    # fixed random_state
    params['random_state'] = 123
    task = mr.spawn(distributed_train_and_metric,
        args=(random_forest, params,
              remote_X_train, remote_y_train,
              remote_X_test, remote_y_test), 
        kwargs={'verbose': 2},
        n_output=2
    )
    tasks.extend(task)
    # 把模型和評價分别存儲
    models.append(task[0])
    metrics.append(task[1])
    
    
# -------------------
# Logistic Regression
# -------------------

for params in gen_lr_parameters():
    # fixed random_state
    params['random_state'] = 123
    task = mr.spawn(distributed_train_and_metric,
        args=(logistic_regression, params,
              remote_X_train, remote_y_train,
              remote_X_test, remote_y_test), 
        kwargs={'verbose': 2},
        n_output=2
    )
    tasks.extend(task)
    # 把模型和評價分别存儲
    models.append(task[0])
    metrics.append(task[1])

# -------
# XGBoost
# -------
    
for params in gen_xgb_parameters():
    # fixed random_state
    params['random_state'] = 123
    # 再指定并發為核的個數
    params['n_jobs'] = n_cores
    task = mr.spawn(distributed_train_and_metric,
        args=(xgb, params,
              remote_X_train, remote_y_train,
              remote_X_test, remote_y_test), 
        kwargs={'verbose': 2},
        n_output=2
    )
    tasks.extend(task)
    # 把模型和評價分别存儲
    models.append(task[0])
    metrics.append(task[1])


# 把順序打亂,目的是能分散到 worker 上平均一點
shuffled_tasks = np.random.permutation(tasks)
_ = mr.ExecutableTuple(shuffled_tasks).execute()           

可以看到代碼幾乎一緻。

運作檢視結果:

CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms
Wall time: 1min 59s           

時間一下子從 31 分鐘多來到了 2 分鐘,提升 15x+。但代碼修改的代價可以忽略不計。

細心的讀者可能注意到了,分布式運作的代碼中,我們把模型的 verbose 給打開了,在分布式環境下,因為這些函數遠端執行,列印的内容隻會輸出到 worker 的标準輸出流,我們在用戶端不會看到列印的結果,但 Mars 提供了一個非常有用的接口來讓我們檢視每個模型運作時的輸出。

以第0個模型為例,我們可以在 Mars 對象上直接調用

fetch_log

方法。

print(models[0].fetch_log())           

輸出我們簡略一部分。

building tree 1 of 50
building tree 2 of 50
building tree 3 of 50
building tree 4 of 50
building tree 5 of 50
building tree 6 of 50
# 中間省略
building tree 49 of 50
building tree 50 of 50           

要看哪個模型都可以通過這種方式。試想下,如果沒有

fetch_log

API,你确想看中間過程的輸出有多麻煩。首先這個函數在哪個 worker 上執行,不得而知;然後,即便知道是哪個 worker,因為每個 worker 上可能有多個函數執行,這些輸出就可能混雜在一起,甚至被龐大日志淹沒了。

fetch_log

接口讓使用者不需要關心在哪個 worker 上執行,也不用擔心日志混合在一起。

想要了解

fetch_log

接口,可以檢視

文檔

還有更多

Mars Remote API 的能力其實不止這些,舉個例子,在 remote 内部可以 spawn 新的函數;也可以調用 Mars tensor、DataFrame 或者 learn 的算法。這些内容,讀者們可以先行探索,後續我們再寫别的文章介紹。

總結

Mars Remote API 通過并行和分布式 Python 函數,用很小的修改代價,極大提升了執行效率。

對 Mars 項目感興趣的讀者們,歡迎 star Github 項目,以及訂閱我們的專欄。

聯系我們

除了可以在

Github Issues

和我們聯系,也可以加入釘釘群 32697156 和我們交流。