在Keras上使用
我們已經将 tf.distribute.Strategy 內建到 tf.keras 中。tf.keras 是一個建構和訓練模型的進階API。通過內建到 tf.keras 後端, 用Keras訓練架構寫的程式可以無縫進行分布式訓練。
您需要對代碼中進行以下更改:
建立一個 tf.distribute.Strategy 執行個體
将Keras模型的建立和編譯過程挪到strategy.scope中
支援各種類型的Keras模型:順序模型、函數式模型和子類模型:
下面是一個非常簡單的Keras模型示例:
mirrored_strategy = tf.distribute.MirroredStrategy()
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
model.compile(loss='mse', optimizer='sgd')
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
'''
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 2ms/step - loss: 1.4031
Epoch 2/2
10/10 [==============================] - 0s 1ms/step - loss: 0.6202
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 0s 1ms/step - loss: 0.3851
0.3851303160190582
'''
在自定義訓練循環上使用
在進階 API上使用tf.distribute.Strategy 隻需要修改幾行代碼. 再多花點功夫,你也可以在自定義訓練循環上應用 tf.distribute.Strategy.
如果您需要比Estimator或Keras更大的靈活性和對訓練循環更強的控制,則可以編寫自定義訓練循環。例如,使用GAN時,您可能希望每輪生成器和鑒别器訓練不同的steps。同樣,進階架構也不太适合強化學習。
為了支援自定義訓練循環, 我們通過 tf.distribute.Strategy 類提供了一組核心方法。剛開始使用這些代碼需要對代碼進行較小的重組,但是一旦完成,隻需更改Strategy執行個體就應該能夠在GPU、TPU和多台計算機之間切換。
在這裡,我們将展示一個簡短的代碼片段說明這個用例:使用與以前一樣的Keras模型。
首先,需要在政策範圍内建立模型和Optimizer。這樣可以確定使用模型和Optimizer建立的任何變量都是鏡像變量。
with mirrored_strategy.scope():
model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
optimizer = tf.keras.optimizers.SGD()
#接下來, 我們要建立輸入資料集并調用 tf.distribute.Strategy.experimental_distribute_dataset 将資料集按此政策分布:
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(1000).batch(
global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
#然後定義訓練的一個step,使用 tf.GradientTape 計算梯度,使用optimizer将梯度用于更新model變量。要分發這個訓練步驟, 我們把它放到 step_fn 函數中,将次函數和從 dist_dataset 建立的資料集一起傳遞給 tf.distrbute.Strategy.run :
@tf.function
def train_step(dist_inputs):
def step_fn(inputs):
features, labels = inputs
with tf.GradientTape() as tape:
# training=True is only needed if there are layers with different
# behavior during training versus inference (e.g. Dropout).
logits = model(features, training=True)
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(
logits=logits, labels=labels)
loss = tf.reduce_sum(cross_entropy) * (1.0 / global_batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
return cross_entropy
per_example_losses = mirrored_strategy.run(step_fn, args=(dist_inputs,))
mean_loss = mirrored_strategy.reduce(
tf.distribute.ReduceOp.MEAN, per_example_losses, axis=0)
return mean_loss
#定義好training_step之後, 我們就可以疊代 dist_dataset 循環進行訓練
with mirrored_strategy.scope():
for inputs in dist_dataset:
print(train_step(inputs))