由于随着神經網絡層數的增多,需要訓練的參數也會增多,随之而來需要的資料集就會很大,這樣會造成需要更大的運算資源,而且還要消耗很長的運算時間。TensorFlow提供了一個可以分布式部署的模式,将一個訓練任務拆分成多個小任務,配置到不同的計算機上完成協同運算,這樣使用計算機群運算來代替單機運算,可以使訓練時間大幅度縮短。
一 分布式TensorFlow角色以及原理
要想配置TensorFlow為分布訓練,首先需要了解TensorFlow中關于分布式的角色配置設定。
- ps:作為分布式訓練的服務端,等到各個終端(supervisors)來連接配接。
- worker:在TensorFlow的代碼注釋中被稱為supervisors,作為分布式訓練的運算終端。
- chief supervisors:在衆多運算終端中必須選中一個作為主要的運算終端。該終端是在運算終端中最先啟動的,它的功能是合并各個終端運算後的學習參數,将其儲存再寫入。
每個具體角色網絡辨別都是唯一的,即分布在不同IP的機器上(或者同一主機但不同端口号)。
在實際運作中,各個角色的網絡建構部分代碼必須完全相同。三者的分工如下:
- 伺服器端作為一個多方協調者,等待各個運算終端來連接配接。
- cheif supervisors會在啟動時統一管理全局的學習參數,進行初始化或從模型載入。
- 其它的運算終端隻是負責得到其相應的任務并進行計算,并不會儲存檢查點以及用于TensorBoard可視化的summary日志等任何參數資訊。
二 分布部署TensorFlow的具體方法
配置過程中,首先建立一個server,在server中會将ps以及所有worker的ip端口準備好,接着使用tf.train.Supervisor中的managed_seesion來管理打開的session,session隻負責運算,而通信協調的事情就都交給Supervisor來管理了。
三 使用TensorFlow實作分布式部署訓練
下面開始實作一個分布式訓練的網絡模型,仍然以線性回歸的模型作為原型,并将其改為分布式。使我們需要在本機通過3個端口來建立3個終端,分别是ps,兩個worker。代碼主要分為以下幾部分:
1.為每個角色建立IP位址和端口,建立server
首先建立叢集(cluster), ClusterSpec的定義,需要把你要跑這個任務的所有的ps和worker 的節點的ip和端口的資訊都包含進去, 所有的角色都要執行這段代碼, 就大家互相知道了, 這個叢集裡面都有哪些成員,不同的成員的類型是什麼, 是ps還是worker。
然後建立一個server,在server中會将ps以及所有worker的ip端口準備好,在同一台電腦開三個不同的端口,分别代表ps,chief supervisors和worker。角色的名稱用strjob_name表示。從 tf.train.Server這個的定義開始,就每個角色不一樣了。 如果角色名字是ps的話, 程式就join到這裡,作為參數更新的服務, 等待其他worker角色給它送出參數更新的資料。如果是worker角色,就執行後面的計算任務。以ps為例(先建立ps檔案):
'''
(1)為每個角色添加IP位址和端口,建立server
'''
'''定義IP和端口号'''
#指定伺服器ip和port
strps_hosts = '127.0.0.1:1234'
#指定兩個終端的ip和port
strworker_hosts = '127.0.0.1:1235,127.0.0.1:1236'
#定義角色名稱
strjob_name = 'ps'
task_index = 0
#将字元串轉為數組
ps_hosts = strps_hosts.split(',')
worker_hosts = strworker_hosts.split(',')
cluster_spec= tf.train.ClusterSpec({'ps':ps_hosts,'worker':worker_hosts})
#建立Server
server = tf.train.Server(
cluster_spec,
job_name = strjob_name,
task_index = task_index)
2.為ps角色添加等待函數
ps角色使用server.join()函數進行線程挂起,開始接受連接配接消息。
'''
(2) 為ps角色添加等待函數
'''
#ps角色處于監聽狀态,等待終端連接配接
if strjob_name == 'ps':
print('waiting....')
server.join()
3.建立網絡結構
與正常的程式不同,在建立網絡結構時,使用tf.device()函數将全部的節點都放在目前任務下。task:0對應worker1(可以了解為任務0對應着角色1),task:1對應worker2。
在rf.device()函數中的任務是通過tf.train.replica_device_setter()來指定的。
在tf.train.replica_device_setter()中使用worker_device()來定義具體任務名稱:使用cluster的配置來指定角色和對應的ip位址,進而實作整個任務下的圖節點,
'''
(3) 建立網絡結構
'''
#設定訓練集資料長度
n_train = 100
#生成x資料,[-1,1]之間,均分成n_train個資料
train_x = np.linspace(-1,1,n_train).reshape(n_train,1)
#把x乘以2,在加入(0,0.3)的高斯正太分布
train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1])
#繪制x,y波形
plt.figure()
plt.plot(train_x,train_y,'ro',label='y=2x') #o使用圓點标記一個點
plt.legend()
plt.show()
#建立網絡結構時,通過tf.device()函數将全部的節點都放在目前任務下 task:0對應worker1 task:1對應worker2
with tf.device(tf.train.replica_device_setter(
worker_device = '/job:worker/task:{0}'.format(task_index),
cluster = cluster_spec)):
'''
前向回報
'''
#建立占位符
input_x = tf.placeholder(dtype=tf.float32)
input_y = tf.placeholder(dtype=tf.float32)
#模型參數
w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w') #設定正太分布參數 初始化權重
b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b') #設定正太分布參數 初始化偏置
#建立一個global_step變量
global_step = tf.train.get_or_create_global_step()
#前向結構
pred = tf.multiply(w,input_x) + b
#将預測值以直方圖形式顯示,給直方圖命名為'pred'
tf.summary.histogram('pred',pred)
'''
反向傳播bp
'''
#定義代價函數 選取二次代價函數
cost = tf.reduce_mean(tf.square(input_y - pred))
#将損失以标量形式顯示 該變量命名為loss_function
tf.summary.scalar('loss_function',cost)
#設定求解器 采用梯度下降法 學習了設定為0.001 并把global_step變量放到優化器中,這樣每運作一次優化器,globle_step就會自動獲得目前疊代的次數
train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step)
saver = tf.train.Saver(max_to_keep = 1)
#合并所有的summary
merged_summary_op = tf.summary.merge_all()
#初始化所有變量,是以變量需要放在其前面定義
init =tf.global_variables_initializer()
為了使載入檢查點檔案能夠同步循環次數,這裡添加了一個global_step變量,将其放到優化器中。這樣每運作一次優化器,global_step就會自動加1.
4.建立Supervisor,管理session
'''
(4)建立Supervisor,管理session
'''
training_epochs = 2000
display_step = 20
sv = tf.train.Supervisor(is_chief = (task_index == 0), #0号worker為chief
logdir='./LinearRegression/super/', #檢查點和summary檔案儲存的路徑
init_op = init, #初始化所有變量
summary_op = None, #summary_op用于自動儲存summary檔案,設定為None,表示不自動儲存
saver = saver, #将儲存檢查點的saver對象傳入,supervisor會自動儲存檢查點檔案。否則設定為None
global_step = global_step,
save_model_secs = 50 #儲存檢查點檔案的時間間隔
)
- 在tf.train.Supervisor()函數中,is_cheif表明了是否為cheif supervisors角色,這裡将task_index = 0的worker設定成chief supervisors。
- logdir:為檢查點和summary日志檔案的儲存路徑。不過這個似乎啟動就會去這個logdir的目錄去看有沒有checkpoint的檔案, 有的話就自動裝載了,沒有就用init_op指定的初始化參數。
- init_op:表示使用初始化變量的函數。
- summary_op:将儲存summary的對象傳入,就會自動儲存summary檔案。這裡設定為None,表示不自動儲存。
- saver:将儲存檢查點的saver對象傳入,Supervisor就會自動儲存檢查點檔案。如果不想自動儲存,就設定為None。
- global_step:為疊代次數。
- save_model_op:為儲存檢查點檔案的時間間隔,這裡設定成50,表明每50秒自動儲存一次檢查點檔案。為了使程式運作時間長一些,我們更改了training_epochs參數。
5.疊代訓練
session中的内容和之前的一樣,直接疊代訓練即可,由于使用了Supervisor管理session,将使用sv.summary_computed函數來儲存summary檔案,同樣,如果想要手動儲存監測點檔案,也可以使用sv.saver.save()函數。
'''
(5) 疊代訓練
'''
#連接配接目标角色建立session
with sv.managed_session(server.target) as sess:
print("sess ok:")
print(global_step.eval(session=sess))
print('開始疊代:')
#存放批次值和代價值
plotdata = {'batch_size':[],'loss':[]}
#開始疊代 這裡step表示目前執行步數,疊代training_epochs輪 需要執行training_epochs*n_train步
for step in range(training_epochs*n_train):
for (x,y) in zip(train_x,train_y):
#開始執行圖 并傳回目前步數
_,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y})
#生成summary
summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y})
#将summary寫入檔案 手動儲存summary日志檔案
sv.summary_computed(sess,summary_str,global_step = step)
#一輪訓練完成後 列印輸出資訊
if step % display_step == 0:
#計算代價值
loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y})
print('step {0} cost {1} w {2} b{3}'.format(step,loss,sess.run(w),sess.run(b)))
#儲存每display_step輪訓練後的代價值以及目前疊代輪數
if not loss == np.nan:
plotdata['batch_size'].append(step)
plotdata['loss'].append(loss)
print('Finished!')
#手動儲存檢查點檔案
#sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step)
sv.stop()
- 在設定了自動儲存檢查點檔案後,手動儲存仍然有效。程式裡我們在Supervisor對象建立的時候指定了自動儲存檢查點檔案,程式裡被我注釋掉的最後一行是采用手動儲存檢查點檔案。
- 在Supervisor對象建立的時候指定了不自動儲存summary日志檔案,我們采用了手動儲存,調用了sv.summary_computed()函數。
- 在運作一半後終止,再運作Supervisor時會自動載入模型的參數,不需要手動調用saver.restore()。
- 在session中,不需要再運作tf.global_variables_initializer()函數。因為在Supervisor建立的時候回調用傳入的init_op進行初始化,如果加了sess.run(tf.global_variables_initializer()),則會導緻所載入模型的變量被二次清空。
6.建立worker檔案
将ps.py檔案複制兩份,一個叫worker1.py,一個叫worker2.py。将角色名稱修改為worker,并将worker2.py中的task_index修改為1。同時需要将worker2.py檔案中手動儲存summary日志的代碼注釋掉。
worker1.py檔案修改如下:
#定義角色名稱
strjob_name = 'worker'
task_index = 0
worker2.py檔案修改如下:
#定義角色名稱
strjob_name = 'worker'
task_index = 1
在這個程式中使用了sv.summary_computed()函數手動将運作時動态的資料儲存下來,以便于在TensorBoard中檢視,但是在分布式部署的時候,使用該功能還需要注意以下幾點:
- worker2檔案中不能使用sv.summary_computed()函數,因為worker2不是chief supervisors,在worker2中是不會為Supervisor對象構造預設summary_writer(所有的summary日志資訊都要通過該對象進行寫)對象的,是以即使程式調用sv.summary_computed()也無法執行下去,程式會報錯。
- 手寫控制summary日志和檢查點檔案儲存時,需要将chief supervisors以外的worker全部去掉才可以,可以使用Supervisor按時間間隔儲存的形式來管理,這樣用一套代碼就可以解決了。
7.部署運作
在spyder中先将ps.py檔案運作起來,選擇菜單Consoles->Open an Ipython console,新打開一個Consoles,如下圖

在spider面闆右下角,可以看到在原有标簽為'Console 1/A'标簽又多了一個‘Console 2/A’标簽,選中這個标簽,就激活了這個标簽。
運作worker2.py檔案。同理,啟動'Console 3/A'運作worker1.py檔案。
下面我們可以看到worker1.py檔案的輸出:
我們在程式中設定display_step為20,即疊代20次輸出一次資訊,我們可能看到這個輸出并不是連續的,這是因為跳過的步驟被配置設定到了worker2中去運算了。
worker2.py檔案對應的視窗顯示的資訊如下:
從圖中可以看到worker2和chief supervisors的疊代順序是互補,但也有可能是沒有絕對互補的,但是為什麼有時候沒有絕對互補?可能與Supervisor中的同步算法有關。
分布運算的目的是為了提高整體運算速度,如果同步epoch的準确度需要以犧牲總體運算速度為代價,自然很不合适。是以更合理的推斷是因為單機單次運算太快迫使算法使用了更寬松的同步機制。
重要的一點是對于指定步數的學習參數w和b是一緻的。即統一疊代論述的值是一樣的,這表明兩個終端是在相同的起點上進行運算的。
對于ps.py檔案,其對應的視窗一直默默的隻顯示列印的那句話waiting....,因為它隻負責連接配接參與運算。
四 最後再補充一些名詞解釋
用戶端(Client)
- 用戶端是一個用于建立TensorFlow計算圖并創立與叢集進行互動的會話層
的程式。一般用戶端是通過python或C++實作的。一個獨立的用戶端程序可以同時與多個TensorFlow的服務端相連 ,同時一個獨立的服務端也可以與多個用戶端相連。tensorflow::Session
叢集(Cluster)
- 一個TensorFlow的叢集裡包含了一個或多個作業(job), 每一個作業又可以拆分成一個或多個任務(task)。叢集的概念主要用與一個特定的高層次對象中,比如說訓練神經網絡,并行化操作多台機器等等。叢集對象可以通過
來定義。tf.train.ClusterSpec
作業(Job)
- 一個作業可以拆封成多個具有相同目的的任務(task),比如說,一個稱之為ps(parameter server,參數伺服器)的作業中的任務主要是儲存和更新變量,而一個名為worker(工作)的作業一般是管理無狀态且主要從事計算的任務。一個作業中的任務可以運作于不同的機器上,作業的角色也是靈活可變的,比如說稱之為”worker”的作業可以儲存一些狀态。
任務(Task)
-
任務相當于是一個特定的TesnsorFlow服務端,其相當于一個獨立的程序,該程序屬于特定的作業并在作業中擁有對應的序号。
TensorFlow服務端(TensorFlow server) 。
ps.py完整代碼:
# -*- coding: utf-8 -*-
"""
Created on Thu Apr 19 08:52:30 2018
@author: zy
"""
import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt
'''
分布式計算
'''
'''
(1)為每個角色添加IP位址和端口,建立server
'''
'''定義IP和端口号'''
#指定伺服器ip和port
strps_hosts = '127.0.0.1:1234'
#指定兩個終端的ip和port
strworker_hosts = '127.0.0.1:1235,127.0.0.1:1236'
#定義角色名稱
strjob_name = 'ps'
task_index = 0
#将字元串轉為數組
ps_hosts = strps_hosts.split(',')
worker_hosts = strworker_hosts.split(',')
cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts,'worker': worker_hosts})
#建立server
server = tf.train.Server(
cluster_spec,
job_name = strjob_name,
task_index = task_index)
'''
(2) 為ps角色添加等待函數
'''
#ps角色處于監聽狀态,等待終端連接配接
if strjob_name == 'ps':
print('waiting....')
server.join()
'''
(3) 建立網絡結構
'''
#設定訓練集資料長度
n_train = 100
#生成x資料,[-1,1]之間,均分成n_train個資料
train_x = np.linspace(-1,1,n_train).reshape(n_train,1)
#把x乘以2,在加入(0,0.3)的高斯正太分布
train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1])
#繪制x,y波形
plt.figure()
plt.plot(train_x,train_y,'ro',label='y=2x') #o使用圓點标記一個點
plt.legend()
plt.show()
#建立網絡結構時,通過tf.device()函數将全部的節點都放在目前任務下
with tf.device(tf.train.replica_device_setter(
worker_device = '/job:worker/task:{0}'.format(task_index),
cluster = cluster_spec)):
'''
前向回報
'''
#建立占位符
input_x = tf.placeholder(dtype=tf.float32)
input_y = tf.placeholder(dtype=tf.float32)
#模型參數
w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w') #設定正太分布參數 初始化權重
b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b') #設定正太分布參數 初始化偏置
#建立一個global_step變量
global_step = tf.train.get_or_create_global_step()
#前向結構
pred = tf.multiply(w,input_x) + b
#将預測值以直方圖形式顯示,給直方圖命名為'pred'
tf.summary.histogram('pred',pred)
'''
反向傳播bp
'''
#定義代價函數 選取二次代價函數
cost = tf.reduce_mean(tf.square(input_y - pred))
#将損失以标量形式顯示 該變量命名為loss_function
tf.summary.scalar('loss_function',cost)
#設定求解器 采用梯度下降法 學習了設定為0.001 并把global_step變量放到優化器中,這樣每運作一次優化器,global_step就會自動獲得目前疊代的次數
train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step)
saver = tf.train.Saver(max_to_keep = 1)
#合并所有的summary
merged_summary_op = tf.summary.merge_all()
#初始化所有變量,是以變量需要放在其前面定義
init =tf.global_variables_initializer()
'''
(4)建立Supervisor,管理session
'''
training_epochs = 2000
display_step = 20
sv = tf.train.Supervisor(is_chief = (task_index == 0), #0号worker為chief
logdir='./LinearRegression/super/', #檢查點和summary檔案儲存的路徑
init_op = init, #初始化所有變量
summary_op = None, #summary_op用于自動儲存summary檔案,設定為None,表示不自動儲存
saver = saver, #将儲存檢查點的saver對象傳入,supervisor會自動儲存檢查點檔案。否則設定為None
global_step = global_step,
save_model_secs = 50 #儲存檢查點檔案的時間間隔
)
'''
(5) 疊代訓練
'''
#連接配接目标角色建立session
with sv.managed_session(server.target) as sess:
print("sess ok:")
print(global_step.eval(session=sess))
print('開始疊代:')
#存放批次值和代價值
plotdata = {'batch_size':[],'loss':[]}
#開始疊代 這裡step表示目前執行步數,疊代training_epochs輪 需要執行training_epochs*n_train步
for step in range(training_epochs*n_train):
for (x,y) in zip(train_x,train_y):
#開始執行圖 并傳回目前步數
_,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y})
#生成summary
summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y})
#将summary寫入檔案 手動儲存summary日志檔案
sv.summary_computed(sess,summary_str,global_step = step)
#一輪訓練完成後 列印輸出資訊
if step % display_step == 0:
#計算代價值
loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y})
print('step {0} cost {1} w {2} b{3}'.format(step,loss,sess.run(w),sess.run(b)))
#儲存每display_step輪訓練後的代價值以及目前疊代輪數
if not loss == np.nan:
plotdata['batch_size'].append(step)
plotdata['loss'].append(loss)
print('Finished!')
#手動儲存檢查點檔案
#sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step)
sv.stop()
View Code
worker1.py完整代碼:
# -*- coding: utf-8 -*-
"""
Created on Thu Apr 19 08:52:30 2018
@author: zy
"""
import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt
'''
分布式計算
'''
'''
(1)為每個角色添加IP位址和端口,建立worker
'''
'''定義IP和端口号'''
#指定伺服器ip和port
strps_hosts = '127.0.0.1:1234'
#指定兩個終端的ip和port
strworker_hosts = '127.0.0.1:1235,127.0.0.1:1236'
#定義角色名稱
strjob_name = 'worker'
task_index = 0
#将字元串轉為數組
ps_hosts = strps_hosts.split(',')
worker_hosts = strworker_hosts.split(',')
cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts,'worker': worker_hosts})
#建立server
server = tf.train.Server(
cluster_spec,
job_name = strjob_name,
task_index = task_index)
'''
(2) 為ps角色添加等待函數
'''
#ps角色處于監聽狀态,等待終端連接配接
if strjob_name == 'ps':
print('waiting....')
server.join()
'''
(3) 建立網絡結構
'''
#設定訓練集資料長度
n_train = 100
#生成x資料,[-1,1]之間,均分成n_train個資料
train_x = np.linspace(-1,1,n_train).reshape(n_train,1)
#把x乘以2,在加入(0,0.3)的高斯正太分布
train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1])
#繪制x,y波形
plt.figure()
plt.plot(train_x,train_y,'ro',label='y=2x') #o使用圓點标記一個點
plt.legend()
plt.show()
#建立網絡結構時,通過tf.device()函數将全部的節點都放在目前任務下
with tf.device(tf.train.replica_device_setter(
worker_device = '/job:worker/task:{0}'.format(task_index),
cluster = cluster_spec)):
'''
前向回報
'''
#建立占位符
input_x = tf.placeholder(dtype=tf.float32)
input_y = tf.placeholder(dtype=tf.float32)
#模型參數
w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w') #設定正太分布參數 初始化權重
b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b') #設定正太分布參數 初始化偏置
#建立一個global_step變量
global_step = tf.train.get_or_create_global_step()
#前向結構
pred = tf.multiply(w,input_x) + b
#将預測值以直方圖形式顯示,給直方圖命名為'pred'
tf.summary.histogram('pred',pred)
'''
反向傳播bp
'''
#定義代價函數 選取二次代價函數
cost = tf.reduce_mean(tf.square(input_y - pred))
#将損失以标量形式顯示 該變量命名為loss_function
tf.summary.scalar('loss_function',cost)
#設定求解器 采用梯度下降法 學習了設定為0.001 并把global_step變量放到優化器中,這樣每運作一次優化器,global_step就會自動獲得目前疊代的次數
train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step)
saver = tf.train.Saver(max_to_keep = 1)
#合并所有的summary
merged_summary_op = tf.summary.merge_all()
#初始化所有變量,是以變量需要放在其前面定義
init =tf.global_variables_initializer()
'''
(4)建立Supervisor,管理session
'''
training_epochs = 2000
display_step = 20
sv = tf.train.Supervisor(is_chief = (task_index == 0), #0号worker為chief
logdir='./LinearRegression/super/', #檢查點和summary檔案儲存的路徑
init_op = init, #初始化所有變量
summary_op = None, #summary_op用于自動儲存summary檔案,設定為None,表示不自動儲存
saver = saver, #将儲存檢查點的saver對象傳入,supervisor會自動儲存檢查點檔案。否則設定為None
global_step = global_step,
save_model_secs = 50 #儲存檢查點檔案的時間間隔
)
'''
(5) 疊代訓練
'''
#連接配接目标角色建立session
with sv.managed_session(server.target) as sess:
print("sess ok:")
print(global_step.eval(session=sess))
print('開始疊代:')
#存放批次值和代價值
plotdata = {'batch_size':[],'loss':[]}
#開始疊代 這裡step表示目前執行步數,疊代training_epochs輪 需要執行training_epochs*n_train步
for step in range(training_epochs*n_train):
for (x,y) in zip(train_x,train_y):
#開始執行圖 并傳回目前步數
_,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y})
#生成summary
summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y})
#将summary寫入檔案 手動儲存summary日志檔案
sv.summary_computed(sess,summary_str,global_step = step)
#一輪訓練完成後 列印輸出資訊
if step % display_step == 0:
#計算代價值
loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y})
print('step {0} cost {1} w {2} b{3}'.format(step,loss,sess.run(w),sess.run(b)))
#儲存每display_step輪訓練後的代價值以及目前疊代輪數
if not loss == np.nan:
plotdata['batch_size'].append(step)
plotdata['loss'].append(loss)
print('Finished!')
#手動儲存檢查點檔案
#sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step)
sv.stop()
worker2.py完整代碼:
# -*- coding: utf-8 -*-
"""
Created on Thu Apr 19 08:52:30 2018
@author: zy
"""
import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt
'''
分布式計算
'''
'''
(1)為每個角色添加IP位址和端口,建立worker
'''
'''定義IP和端口号'''
#指定伺服器ip和port
strps_hosts = '127.0.0.1:1234'
#指定兩個終端的ip和port
strworker_hosts = '127.0.0.1:1235,127.0.0.1:1236'
#定義角色名稱
strjob_name = 'worker'
task_index = 1
#将字元串轉為數組
ps_hosts = strps_hosts.split(',')
worker_hosts = strworker_hosts.split(',')
cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts,'worker': worker_hosts})
#建立server
server = tf.train.Server(
cluster_spec,
job_name = strjob_name,
task_index = task_index)
'''
(2) 為ps角色添加等待函數
'''
#ps角色處于監聽狀态,等待終端連接配接
if strjob_name == 'ps':
print('waiting....')
server.join()
'''
(3) 建立網絡結構
'''
#設定訓練集資料長度
n_train = 100
#生成x資料,[-1,1]之間,均分成n_train個資料
train_x = np.linspace(-1,1,n_train).reshape(n_train,1)
#把x乘以2,在加入(0,0.3)的高斯正太分布
train_y = 2*train_x + np.random.normal(loc=0.0,scale=0.3,size=[n_train,1])
#繪制x,y波形
plt.figure()
plt.plot(train_x,train_y,'ro',label='y=2x') #o使用圓點标記一個點
plt.legend()
plt.show()
#建立網絡結構時,通過tf.device()函數将全部的節點都放在目前任務下
with tf.device(tf.train.replica_device_setter(
worker_device = '/job:worker/task:{0}'.format(task_index),
cluster = cluster_spec)):
'''
前向回報
'''
#建立占位符
input_x = tf.placeholder(dtype=tf.float32)
input_y = tf.placeholder(dtype=tf.float32)
#模型參數
w = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='w') #設定正太分布參數 初始化權重
b = tf.Variable(tf.truncated_normal(shape=[1],mean=0.0,stddev=1),name='b') #設定正太分布參數 初始化偏置
#建立一個global_step變量
global_step = tf.train.get_or_create_global_step()
#前向結構
pred = tf.multiply(w,input_x) + b
#将預測值以直方圖形式顯示,給直方圖命名為'pred'
tf.summary.histogram('pred',pred)
'''
反向傳播bp
'''
#定義代價函數 選取二次代價函數
cost = tf.reduce_mean(tf.square(input_y - pred))
#将損失以标量形式顯示 該變量命名為loss_function
tf.summary.scalar('loss_function',cost)
#設定求解器 采用梯度下降法 學習了設定為0.001 并把global_step變量放到優化器中,這樣每運作一次優化器,global_step就會自動獲得目前疊代的次數
train = tf.train.GradientDescentOptimizer(learning_rate=0.001).minimize(cost,global_step = global_step)
saver = tf.train.Saver(max_to_keep = 1)
#合并所有的summary
merged_summary_op = tf.summary.merge_all()
#初始化所有變量,是以變量需要放在其前面定義
init =tf.global_variables_initializer()
'''
(4)建立Supervisor,管理session
'''
training_epochs = 2000
display_step = 20
sv = tf.train.Supervisor(is_chief = (task_index == 0), #0号worker為chief
logdir='./LinearRegression/super/', #檢查點和summary檔案儲存的路徑
init_op = init, #初始化所有變量
summary_op = None, #summary_op用于自動儲存summary檔案,設定為None,表示不自動儲存
saver = saver, #将儲存檢查點的saver對象傳入,supervisor會自動儲存檢查點檔案。否則設定為None
global_step = global_step,
save_model_secs = 50 #儲存檢查點檔案的時間間隔
)
'''
(5) 疊代訓練
'''
#連接配接目标角色建立session
with sv.managed_session(server.target) as sess:
print("sess ok:")
print(global_step.eval(session=sess))
print('開始疊代:')
#存放批次值和代價值
plotdata = {'batch_size':[],'loss':[]}
#開始疊代 這裡step表示目前執行步數,疊代training_epochs輪 需要執行training_epochs*n_train步
for step in range(training_epochs*n_train):
for (x,y) in zip(train_x,train_y):
#開始執行圖 并傳回目前步數
_,step = sess.run([train,global_step],feed_dict={input_x:x,input_y:y})
#生成summary
summary_str = sess.run(merged_summary_op,feed_dict={input_x:x,input_y:y})
#将summary寫入檔案 手動儲存summary日志檔案
#sv.summary_computed(sess,summary_str,global_step = step)
#一輪訓練完成後 列印輸出資訊
if step % display_step == 0:
#計算代價值
loss = sess.run(cost,feed_dict={input_x:train_x,input_y:train_y})
print('step {0} cost {1} w {2} b{3}'.format(step,loss,sess.run(w),sess.run(b)))
#儲存每display_step輪訓練後的代價值以及目前疊代輪數
if not loss == np.nan:
plotdata['batch_size'].append(step)
plotdata['loss'].append(loss)
print('Finished!')
#手動儲存檢查點檔案
#sv.saver.save(sess,'./LinearRegression/sv/sv.cpkt',global_step = step)
sv.stop()