
1. 訓練的瓶頸在哪裡
- GPU使用率低:模型訓練時GPU顯存沾滿了,但是GPU的使用率比較不穩定,有時候0%,有時候90%,忽高忽低。
- 訓練的資料量大:訓練資料大,在百萬/千萬的量級,訓練一個Epoch需要很長時間,模型疊代周期過長。
2. 提高GPU使用率:CPU vs GPU
GPU使用率低, 主要原因是CPU處理的效率跟不上GPU
2.1 CPU vs GPU的通信
- CPU負責加載資料+資料預處理,并不斷的在記憶體和顯存之間互動資料
- GPU負責模型訓練(圖檔來自網絡)
2.2 解決方案
采用多程序并行處理,加快CPU加載資料的性能
- keras keras 中提供了workers use_multiprocessing來采用多程序方式,并行處理資料,并push到隊列中,共GPU模型訓練。因為程序之間可能互相影響資源,并不是越大越好,workers可以設定2,4,8。
- run_model.fit_generator(
- generator=training_generator,
- class_weight={0: config.weights, 1: 1},
- epochsepochs=epochs,
- verbose=1,
- steps_per_epochsteps_per_epoch=steps_per_epoch,
- callbacks=callbacks_list,
- validation_data=valid_generator,
- validation_stepsvalidation_steps=validation_steps,
- shuffle=True,
- workers=8,
- use_multiprocessing=True,
- max_queue_size=20
- pytorch torch在加載資料中提供類似參數num_workers。pin_memory=True可以直接加載到顯存中,而不需要記憶體
- torch.utils.data.DataLoader(image_datasets[x],
- batch_sizebatch_size=batch_size,
- shuffle=True,
- num_workers=8,
- pin_memory=True)
3. 分布式并行訓練
3.1 并行模式
當訓練的資料量很大時,可以通過多個機器多個GPU來提高訓練的效率。不同于hadoop和spark等分布式資料處理架構,深度學習訓練因為要涉及參數的前項傳播和反向傳播,有兩種并行方式:
- 模型并行( model parallelism ):分布式系統中的不同機器(GPU/CPU等)負責網絡模型的不同部分,通常是神經網絡模型的不同網絡層被配置設定到不同的機器,或者同一層内部的不同參數被配置設定到不同機器。一般是超大的模型,一張顯示卡放不下的情況,如NLP的模型。模型并行的缺點是層和層之間可能存在依賴關系,不能完全的并行。(圖檔來自網絡)
- 資料并行( data parallelism ):不同的機器有同一個模型的多個副本,每個機器配置設定到不同的資料,然後将所有機器的計算結果按照某種方式合并。這種就比較适合大資料的情況。資料并行要解決的問題是資料的分割和傳輸,以及參數的更新。
3.2 資料并行
Facebook在《Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour》介紹了使用 256 塊 GPU 進行 ResNet-50 網絡「資料并行」訓練的方法
- 資料分割: 選用大的batch-size, 按照worker數量進行分割, 分發到不同worker執行
- 參數更新:參數的更新有兩種模式(1)參數伺服器 (2) ring環狀更新(無伺服器模式)
3.2.1 參數伺服器模式
參數伺服器模式,見下圖。在每個worker執行完一個batch的訓練後,反向傳播參數的時候,所有的worker都會把參數傳給參數伺服器,進行彙總求均值,之後再傳給每個worker,進入第二個batch的訓練。(圖檔來自網絡)
參數伺服器有一個或者多個的結構模式,可以看出這種資料并行的模式效率是否提升取決于參數伺服器與worker之間的通信效率,也就是最慢的worker的訓練時間和參數伺服器的接收和更新參數後再回傳的時間。worker數量多的話,參數伺服器可能存在瓶頸。(圖檔來自網絡)
3.2.2 ring-reduce
百度提出的ring-reduce摒棄了參數伺服器,采用環狀結構來更新參數。ring-reduce把所有的worker組成一個兩兩相鄰的環形結構。每個worker隻與相鄰的worker交換參數。經過幾次交換之後,所有的worker都包含其他worker的參數資訊,達到更新的目的。(圖檔來自網絡)
下面幾張圖,可以看到其中的幾個步驟;ring-reduce為了加快速度,并不是一次性交換所有的參數;而是先把參數進行分割,不斷交換分割後參數。
4. 實作架構:Horovod
Horovod 是 Uber 開源的又一個深度學習工具,它的發展吸取了 Facebook「一小時訓練 ImageNet 論文」與百度 Ring Allreduce 的優點,可為使用者實作分布式訓練提供幫助。https://github.com/horovod/horovod
采用NCCL 替換百度的 ring-allreduce 實作。NCCL 是英偉達的集合通信庫,提供高度優化的 ring-allreduce 版本。NCCL 2 允許在多個機器之間運作 ring-allreduc。
如果要把單機的訓練代碼修改成分布式的代碼,隻要幾個步驟就可以了 改造分布式訓練:
- horovod安裝 建議安裝docker的horovod,省去安裝環境的麻煩。horovod依賴NCCL 2 open MPI
- $ mkdir horovod-docker-gpu
- $ wget -O horovod-docker-gpu/Dockerfile https://raw.githubusercontent.com/horovod/horovod/master/Dockerfile.gpu
- $ docker build -t horovod:latest horovod-docker-gpu
- 機器worker機器之間ssh打通
- 修改訓練代碼 horovod支援tf,keras,pytorch和mxnet等不同的深度學習架構。以keras為例,修改主要6個步驟 (1) 初始化:hvd.init() (2)配置設定GPU計算資源:config.gpu_options.visible_device_list = str(hvd.local_rank())(3)分布式的優化器來實作參數的分布式更新:opt = hvd.DistributedOptimizer(opt)(4)定義所有worker模型初始化一緻性 hvd.callbacks.BroadcastGlobalVariablesCallback(0)(5)模型儲存在某一個worker
- from __future__ import print_function
- import keras
- from keras.datasets import mnist
- from keras.models import Sequential
- from keras.layers import Dense, Dropout, Flatten
- from keras.layers import Conv2D, MaxPooling2D
- from keras import backend as K
- import math
- import tensorflow as tf
- import horovod.keras as hvd
- # Horovod: initialize Horovod.
- hvd.init()
- # Horovod: pin GPU to be used to process local rank (one GPU per process)
- config = tf.ConfigProto()
- config.gpu_options.allow_growth = True
- config.gpu_options.visible_device_list = str(hvd.local_rank())
- K.set_session(tf.Session(configconfig=config))
- batch_size = 128
- num_classes = 10
- # Horovod: adjust number of epochs based on number of GPUs.
- epochs = int(math.ceil(12.0 / hvd.size()))
- # Input image dimensions
- img_rows, img_cols = 28, 28
- # The data, shuffled and split between train and test sets
- (x_train, y_train), (x_test, y_test) = mnist.load_data()
- if K.image_data_format() == 'channels_first':
- x_trainx_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
- x_testx_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
- input_shape = (1, img_rows, img_cols)
- else:
- x_trainx_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
- x_testx_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
- input_shape = (img_rows, img_cols, 1)
- x_trainx_train = x_train.astype('float32')
- x_testx_test = x_test.astype('float32')
- x_train /= 255
- x_test /= 255
- print('x_train shape:', x_train.shape)
- print(x_train.shape[0], 'train samples')
- print(x_test.shape[0], 'test samples')
- # Convert class vectors to binary class matrices
- y_train = keras.utils.to_categorical(y_train, num_classes)
- y_test = keras.utils.to_categorical(y_test, num_classes)
- model = Sequential()
- model.add(Conv2D(32, kernel_size=(3, 3),
- activation='relu',
- input_shapeinput_shape=input_shape))
- model.add(Conv2D(64, (3, 3), activation='relu'))
- model.add(MaxPooling2D(pool_size=(2, 2)))
- model.add(Dropout(0.25))
- model.add(Flatten())
- model.add(Dense(128, activation='relu'))
- model.add(Dropout(0.5))
- model.add(Dense(num_classes, activation='softmax'))
- # Horovod: adjust learning rate based on number of GPUs.
- opt = keras.optimizers.Adadelta(1.0 * hvd.size())
- # Horovod: add Horovod Distributed Optimizer.
- opt = hvd.DistributedOptimizer(opt)
- model.compile(loss=keras.losses.categorical_crossentropy,
- optoptimizer=opt,
- metrics=['accuracy'])
- callbacks = [
- # Horovod: broadcast initial variable states from rank 0 to all other processes.
- # This is necessary to ensure consistent initialization of all workers when
- # training is started with random weights or restored from a checkpoint.
- hvd.callbacks.BroadcastGlobalVariablesCallback(0),
- ]
- # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
- if hvd.rank() == 0:
- callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
- model.fit(x_train, y_train,
- batch_sizebatch_size=batch_size,
- callbackscallbacks=callbacks,
- epochsepochs=epochs,
- verbose=1,
- validation_data=(x_test, y_test))
- score = model.evaluate(x_test, y_test, verbose=0)
- print('Test loss:', score[0])
- print('Test accuracy:', score[1])
- 利用horovodrun 執行分布式訓練
horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py
- 并行CPU加載資料和預處理,讓GPU不再等待CPU
- 采用Horovod讓資料并行來提高大資料量的訓練的疊代時間
- 人工智能技術走向成熟,AI語音行業迎來廣闊發展前景
- 未來屬于人工智能工程師,但成功轉型不容易
- 人工智能是如何成為“智商檢測器”的?
- Gartner:人工智能2020年成熟度曲線,哪些技術有價值
- 如何確定人工智能和機器學習項目的安全性