天天看點

TensorFlow(五)隊列與線程

深度學習的模型訓練過程往往需要大量的資料,而将這些資料一次性的讀入和預處理需要大量的時間開銷,是以通常采用隊列與多線程的思想解決這個問題,而且TensorFlow為我們提供了完善的函數。 

TensorFlow提供了整套實作隊列的函數和方法,在TensorFlow中,隊列和變量類似,都是計算圖上有狀态的節點。操作隊列的函數主要有:

  • FIFOQueue():建立一個先入先出(FIFO)的隊列
  • RandomShuffleQueue():建立一個随機出隊的隊列
  • enqueue_many():初始化隊列中的元素
  • dequeue():出隊
  • enqueue():入隊
import tensorflow as tf
with tf.Session() as sess:
    q = tf.FIFOQueue(3 , "float")
    
    #進行三步操作
    init = q.enqueue_many(([0.1 , 0.2 , 0.3],)) #建立隊列
    init2 = q.dequeue() #出隊列
    init3 = q.enqueue(1) #将1入隊
    
    #執行
    sess.run(init)
    sess.run(init2)
    sess.run(init3)

    #将所有隊列元素出隊列印    
    quelen = sess.run(q.size())
    for i in range(quelen):
        print(sess.run(q.dequeue()))
           
TensorFlow(五)隊列與線程

同時需要注意的是隊列會由于一些例如 隊列滿進入隊列 、 資料未能彈出 等原因堵塞 , 直到堵塞的原因解除。

入隊操作都在主線程中進行,Session中可以多個線程一起運作。 在資料輸入的應用場景中,入隊操作從硬碟上讀取,入隊操作是從硬碟中讀取輸入,放到記憶體當中,速度較慢。 使用QueueRunner可以建立一系列新的線程進行入隊操作,讓主線程繼續使用資料。如果在訓練神經網絡的場景中,就是訓練網絡和讀取資料是異步的,主線程在訓練網絡,另一個線程在将資料從硬碟讀入記憶體。同時tf提供了 QueueRunner 函數 用來解決異步問題 , 可以建立一系列線程同時進入主線程操作 , 資料的讀取和操作是同步的 , 即主線程在進行模型的訓練同時将資料讀入。

import tensorflow as tf
with tf.Session() as sess:
    
    q = tf.FIFOQueue(1000 , "float32")
    
    counter = tf.Variable(0.0)
    
    #第一個任務 counter為計數器 不斷計數
    add_op = tf.assign_add(counter , tf.constant(1.0))
    
    #第二個任務 将計數結果不斷入隊
    enqueueData_op = q.enqueue(counter)
    
    #建立四個線程完成 計數入隊的操作
    qr = tf.train.QueueRunner(q , enqueue_ops = [add_op , enqueueData_op] * 4)
    sess.run(tf.global_variables_initializer())
    
    #啟動入隊線程
    enqueue_threads = qr.create_threads(sess , start = True)
    
    for i in range(10):
        #出隊
        print(sess.run(q.dequeue()))
           
TensorFlow(五)隊列與線程

此時會出現報錯 錯誤:tensorflow:QueueRunner中的異常:會話已關閉。

當main循環結束後,該Session就會自動關閉,導緻線程無法繼續。

import tensorflow as tf
with tf.Session() as sess:
    
    q = tf.FIFOQueue(1000 , "float32")
    
    counter = tf.Variable(0.0)
    
    #第一個任務 counter為計數器 不斷計數
    add_op = tf.assign_add(counter , tf.constant(1.0))
    
    #第二個任務 将計數結果不斷入隊
    enqueueData_op = q.enqueue(counter)
    
    #啟動Session
    sess = tf.Session()
    #建立四個線程完成 計數入隊的操作
    qr = tf.train.QueueRunner(q , enqueue_ops = [add_op , enqueueData_op] * 4)
    sess.run(tf.global_variables_initializer())
    
    #啟動入隊線程
    enqueue_threads = qr.create_threads(sess , start = True)
    
    for i in range(10):
        #出隊
        print(sess.run(q.dequeue()))
           

而先将sess初始化為Session,便不會報錯主要原因是因為tensorflow是在圖上進行計算,要驅動一張圖進行計算,必須要送入資料,如果說資料沒有送進去,那麼sess.run(),就無法執行,tf也不會主動報錯,提示沒有資料送進去,其實tf也不能主動報錯,因為tf的訓練過程和讀取資料的過程其實是異步的。tf會一直挂起,等待資料準備好。現象就是tf的程式不報錯,但是一直不動,跟挂起類似。 

多個線程時由于一個線程與另一個線程會有沖突的操作,導緻會話出現錯誤,為了解決同步問題,提供Coordinator和QueueRunner函數來對線程進行控制和協調。來共同協作停止線程

import tensorflow as tf
with tf.Session() as sess:
    
    q = tf.FIFOQueue(1000 , "float32")
    
    counter = tf.Variable(0.0)
    
    #第一個任務 counter為計數器 不斷計數
    add_op = tf.assign_add(counter , tf.constant(1.0))
    
    #第二個任務 将計數結果不斷入隊
    enqueueData_op = q.enqueue(counter)
    
    #啟動Session
    sess = tf.Session()
    #建立四個線程完成 計數入隊的操作
    qr = tf.train.QueueRunner(q , enqueue_ops = [add_op , enqueueData_op] * 4)
    sess.run(tf.global_variables_initializer())
    
    #啟動入隊線程
    enqueue_threads = qr.create_threads(sess , start = True)
    
    #線程協調器 協調線程之間的關系
    coord = tf.train.Coordinator()
    enqueue_threads = qr.create_threads(sess , coord = coord , start = True)
    
    for i in range(10):
        #出隊
        print(sess.run(q.dequeue()))
        
    coord.request_stop()
    coord.join(enqueue_threads)
           
首先需要思考的一個問題是,什麼是資料讀取?以圖像資料為例,讀取資料的過程可以用下圖來表示:
TensorFlow(五)隊列與線程

假設我們的硬碟中有一個圖檔資料集0001.jpg,0002.jpg,0003.jpg……我們隻需要把它們讀取到記憶體中,然後提供給GPU或是CPU進行計算就可以了。這聽起來很容易,但事實遠沒有那麼簡單。事實上,我們必須要把資料先讀入後才能進行計算,假設讀入用時0.1s,計算用時0.9s,那麼就意味着每過1s,GPU都會有0.1s無事可做,這就大大降低了運算的效率。

如何解決這個問題?方法就是将讀入資料和計算分别放在兩個線程中,将資料讀入記憶體的一個隊列,如下圖所示:

TensorFlow(五)隊列與線程

讀取線程源源不斷地将檔案系統中的圖檔讀入到一個記憶體的隊列中,而負責計算的是另一個線程,計算需要資料時,直接從記憶體隊列中取就可以了。這樣就可以解決GPU因為IO而空閑的問題!

而在tensorflow中,為了友善管理,在記憶體隊列前又添加了一層所謂的“檔案名隊列”。

為什麼要添加這一層檔案名隊列?我們首先得了解機器學習中的一個概念:epoch。對于一個資料集來講,運作一個epoch就是将這個資料集中的圖檔全部計算一遍。如一個資料集中有三張圖檔A.jpg、B.jpg、C.jpg,那麼跑一個epoch就是指對A、B、C三張圖檔都計算了一遍。兩個epoch就是指先對A、B、C各計算一遍,然後再全部計算一遍,也就是說每張圖檔都計算了兩遍。

tensorflow使用檔案名隊列+記憶體隊列雙隊列的形式讀入檔案,可以很好地管理epoch。下面我們用圖檔的形式來說明這個機制的運作方式。如下圖,還是以資料集A.jpg, B.jpg, C.jpg為例,假定我們要跑一個epoch,那麼我們就在檔案名隊列中把A、B、C各放入一次,并在之後标注隊列結束。

TensorFlow(五)隊列與線程
程式運作後,記憶體隊列首先讀入A(此時A從檔案名隊列中出隊):
TensorFlow(五)隊列與線程
再依次讀入B和C:
TensorFlow(五)隊列與線程

此時,如果再嘗試讀入,系統由于檢測到了“結束”,就會自動抛出一個異常(OutOfRange)。外部捕捉到這個異常後就可以結束程式了。這就是tensorflow中讀取資料的基本機制。如果我們要跑2個epoch而不是1個epoch,那隻要在檔案名隊列中将A、B、C依次放入兩次再标記結束就可以了。

典型的檔案資料讀取會包含下面這些步驟:

(1)檔案名清單

可以使用字元串張量(比如

["file0", "file1"]

[("file%d" % i) for i in range(2)]

, 

[("file%d" % i) for i in range(2)]

) 或者

tf.train.match_filenames_once

 ()函數來産生檔案名清單。
filenames = [os.path.join(data_dir, 'data_batch_%d.bin' % i)
                 for i in xrange(1, 6)]      

(2)檔案名隊列

對于檔案名隊列,我們使用tf.train.string_input_producer()函數。這個函數需要傳入一個檔案名list,系統會自動将它轉為一個先入先出的檔案名隊列, 檔案閱讀器會需要它來讀取資料。

# 同時打開多個檔案,顯示建立Queue,同時隐含了QueueRunner的建立
filename_queue = tf.train.string_input_producer(filenames)      

(3)可配置的 檔案名亂序(shuffling),可配置的最大訓練疊代數(epoch limit)

tf.train.string_input_producer還有兩個重要的參數,一個是num_epochs,它就是我們上文中提到的epoch數。另外一個就是shuffle,shuffle是指在一個epoch内檔案的順序是否被打亂。若設定shuffle=False,如下圖,每個epoch内,資料還是按照A、B、C的順序進入檔案名隊列,這個順序不會改變:

TensorFlow(五)隊列與線程
如果設定shuffle=True,那麼在一個epoch内,資料的前後順序就會被打亂,如下圖所示:
TensorFlow(五)隊列與線程

在tensorflow中,記憶體隊列不需要我們自己建立,我們隻需要使用reader對象從檔案名隊列中讀取資料就可以了。

(4)針對輸入檔案格式的閱讀器

根據你的檔案格式, 選擇對應的檔案閱讀器, 然後将檔案名隊列提供給閱讀器的

read()

方法。閱讀器的

read()

方法會輸出一個key來表征輸入的檔案和其中的紀錄(對于調試非常有用),同時得到一個字元串标量, 這個字元串标量可以被一個或多個解析器,或者轉換操作将其解碼為張量并且構造成為樣本。

繼續閱讀