天天看點

18-TFRecord 資料格式化存儲工具(CDBmax 資料國度)

一、寫在前面

  這篇是拖了很久才正式寫的一篇,也算是工程化的應用中比較重要的一個部分,是以在這裡做一個簡要的分享。TFRecord是TensorFlow官方推薦使用的資料格式化存儲工具,可以很大程度上提高TensorFlow訓練過程中的IO效率。我們之前在做一些簡單的訓練的時候都是使用文本存儲的方式,比如之前所作的花卉分類使用的兩萬張圖檔存儲之文本檔案中大約耗費了70個G的存儲空間,而且一次性讀入這麼大的資料量很顯然是一個不現實的事情,就算資料部分成功讀入之後,模型所占記憶體空間也許将無法滿足,就容易出現IOError的狀況,而出現了這種問題解決得辦法不外乎兩種方法,一種是加裝記憶體條或者是加裝同型号顯示卡,另外一種就是使用一種合理的IO讀寫的方式,來降低記憶體或者顯存得開銷。首先第一種方法需要考慮的問題就比較多了,它不僅與個人的财力挂鈎,還與機器可拓展空間和真實空間挂鈎,是以學習一種優化存儲空間的方式就顯得尤為重要。

二、相關原理

  1.TFRecord相對優于一般方法的原因主要有以下幾點:1.TFRecord内部采用“Protocol Buffer”的二進制資料編碼方案,其單個位元組相對于漢字的“utf-8”編碼所占用的位元組數要少的多,因而在生成一次TFRecord之後,模型訓練過程中的資料讀取、加工處理的效率都會有很大的提升空間。2.利用了Threading(線程)和Queues(隊列)從TFRecord中分批次讀取資料,這種方法可以實作一邊讀取資料至隊列,然後一邊使用隊首資料訓練模型的目的。這樣就起到降低了記憶體空間的高占用率的作用。

  2.簡要介紹一下本篇部落格的邏輯和任務,本文主要編寫針對一個簡單的分類資料集通過TFRecord的檔案存儲方式存取至磁盤上,然後通過相應讀寫方法,讀取TFRecord檔案,并做簡單的訓練來熟悉整個流程。

三、相關代碼

  1.使用TFRecord的時候,資料機關一般是tf.train.Example或者是tf.train.SequenceExample,Example一般是用于處理數值、圖像大小固定的資料,可使用該方法指定各特征數值的名稱和資料類型。

示例1:

tf.train.Example(features=tf.train.Features(feature={
    'batch':tf.train.Features(int64_list=tf.train.Int64List(value=[batch])),
    'height':tf.train.Features(int64_list=tf.train.Int64List(value=[height])),
    'weight':tf.train.Features(int64_list=tf.train.Int64List(values=[weight])),
    'channels':tf.train.Features(int64_list=tf.train.Int64List(values=[channels]))
}))
           

  該代碼塊中int64_list(整數清單)可替換為BytesList(字元串清單)和FloatList(實數清單),也就是說Features支援存儲如上三種類型的資料。

  SequenceExample一般是用于處理文本、時間序列沒有固定長度的資料,該部分在NLP中是比較常用的一種資料存儲方式。

示例2:

#定義對象
example=tf.train.SequenceExample()
#定義資料類型和資料量大小
example.context.feature['length'].int64_list.value.append(len(words))
#通過feature_lists來加載資料
word_lists=example.feature_lists.feature_list['word']
for word in words:
    word_lists.feature_add().int64_list.value.append(word_id(word))
           

  2.寫入資料至檔案

  首先建立一個協定記憶體塊(Protocol Buffer),該協定記憶體塊中将用于存放特征屬性[features]。

#1.TFRecord 寫入
#定義Protocol Buffer(協定緩沖器)
def examples(image,label):
    return tf.train.Example(features=tf.train.Features(feature={
        'data_X':tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])),
        'data_Y':tf.train.Feature(bytes_list=tf.train.BytesList(value=[label])),
    }))
           

  然後将擷取到的資料填入到Example記憶體協定塊中,再将協定記憶體塊序列化為一個字元串并且通過

tf.python_io.TFRecordWriter()

方法寫入至TFRecords檔案。

#IO寫入
def writer(train_file,test_file):
    #讀取data_X,data_Y資料集
    train_X,train_Y,test_X,test_Y=divide_data()
    #定義寫train_file IO對象
    writer1=tf.python_io.TFRecordWriter(train_file)
    for data_X,data_Y in zip(train_X,train_Y):
        #print(data_X,'*************')
        #print(data_Y,'*************')
        #轉換資料格式
        print(data_X.shape)
        print(data_Y.shape)
        data_X=data_X.astype(np.float32)
        data_Y=data_Y.astype(np.float32)

        mk_em=examples(data_X.tobytes(),data_Y.tobytes())
        writer1.write(mk_em.SerializeToString())
    writer1.close()
    writer2=tf.python_io.TFRecordWriter(test_file)
    for data_X,data_Y in zip(test_X,test_Y):
        data_X=data_X.astype(np.float32)
        data_Y=data_Y.astype(np.float32)

        mk_em=examples(data_X.tobytes(),data_Y.tobytes())
        writer2.write(mk_em.SerializeToString())
    writer2.close()

if __name__=='__main__':
    writer('train_data.tfrecord','test_data.tfrecord')
           

  運作完整檔案之後,将可以看到在同級目錄下面會生成兩個tfRecord檔案,一個名為’train_data.tfrecord’,該檔案用于模型訓練,另一個名為’test_data.tfrecord’,該檔案作為測試集而存在。

  3.從tfRecord檔案中讀取資料

  首先擷取隊列,并對隊列中的記憶體協定塊進行讀取和解碼,然後将轉換之後的資料組合成一個batch的資料,傳入至模型。用于優化模型參數。代碼塊如下所示:

#讀取batch_size條資料
def read_tfrecord(filename,batch_size=256):
    #擷取隊列
    filename_queue=tf.train.string_input_producer([filename])
    #建構資料讀取器
    reader=tf.TFRecordReader()
    #讀取隊列中的資料
    _,serializer_example=reader.read(filename_queue)

    #處理樣本
    features=tf.parse_single_example(
        serializer_example,
        features={
            'data_X':tf.FixedLenFeature([],tf.string),
            'data_Y':tf.FixedLenFeature([],tf.string)
        }
    )
    #讀取特征
    data_X=tf.decode_raw(features['data_X'],tf.float32)
    data_Y=tf.decode_raw(features['data_Y'],tf.float32)
    
    #格式重定
    data_X=tf.reshape(data_X,[8])
    data_Y=tf.reshape(data_Y,[5])

    #轉換為批次的Tensor對象 capacity表示隊列元素中的最大數量
    data_X,data_Y=tf.train.batch([data_X,data_Y],batch_size=batch_size,capacity=3500)

    return data_X,data_Y
           

  4.建構DNN網絡模型

  這裡建構了三隐藏層、一輸出層、一BN層以及兩層dropout層的DNN網絡模型結構,非線性變換中使用sigmoid函數對資料進行非線性變換的處理,具體代碼如下:

#build model
def model(data_X,data_Y):
    #tf.reset_default_graph()
    #搭建全連接配接網絡
    hidden_1=1024
    hidden_2=256
    hidden_3=32
    print(data_X.get_shape())
    input_m=data_X.get_shape()[1]
    output_m=data_Y.get_shape()[1]
    learning_rate=0.001
    global_step = tf.Variable(0, name='global_step', trainable=False)
    #建構初始化w,b
    with tf.variable_scope('scope',reuse = tf.AUTO_REUSE):
        weights={
            "w1":tf.Variable(tf.get_variable('w1',[input_m,hidden_1],dtype=tf.float32,initializer=tf.random_normal_initializer(mean=0, stddev=0.1))),
            "w2":tf.Variable(tf.random_normal([hidden_1,hidden_2],stddev=0.1),tf.float32,name='w2'),
            "w3":tf.Variable(tf.random_normal([hidden_2,hidden_3],stddev=0.1),tf.float32,name='w3'),
            "out":tf.Variable(tf.get_variable('out',[hidden_3,output_m],dtype=tf.float32,initializer=tf.random_normal_initializer(mean=0, stddev=0.1)))
        }
        b={
            'b1':tf.Variable(tf.zeros([hidden_1]),tf.float32,name='b1'),
            'b2':tf.Variable(tf.zeros([hidden_2]),tf.float32,name='b2'),
            'b3':tf.Variable(tf.zeros([hidden_3]),tf.float32,name='b3'),
            'b4':tf.Variable(tf.zeros([output_m]),tf.float32,name='b4')
        }
    layer0=batch_norm(data_X,decay=0.9,updates_collections=None,is_training=True)
    layer1=tf.nn.sigmoid(tf.add(tf.matmul(layer0,weights['w1']),b['b1']))
    dropout1=tf.nn.dropout(layer1,keep_prob=0.75)
    layer2=tf.nn.sigmoid(tf.add(tf.matmul(dropout1,weights['w2']),b['b2']))
    dropout2=tf.nn.dropout(layer2,keep_prob=0.75)
    layer3=tf.nn.sigmoid(tf.add(tf.matmul(dropout2,weights['w3']),b['b3']))
    out=tf.add(tf.matmul(layer3,weights['out']),b['b4'])
    #擷取softmax的分類機率值
    predict=tf.nn.softmax(out,name='output')
    #計算交叉熵損失函數
    loss=tf.reduce_mean(-tf.reduce_sum(data_Y*tf.log(predict),axis=1))
    
    #使用梯度下降求解,最小化誤差
    #train=tf.train.GradientDescentOptimizer(learning_rate=learning_rate).minimize(loss)
    train=tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)
    #計算正确率
    equal=tf.equal(tf.argmax(predict,axis=1),tf.argmax(data_Y,axis=1))
    correct_rate=tf.reduce_mean(tf.cast(equal,tf.float32))
    return train,loss,correct_rate,global_step
           

  5.分批次讀取資料并訓練DNN模型

#train
def train():
    checkpoint_dir='./model'
    save_time=500
    training_epochs=100000
    display_time=5
    # 讀取訓練集 TFRecord檔案Tensor對象
    train_X, train_Y = read_tfrecord('train_data.tfrecord')

    #建構傳回的訓練器
    train,loss,correct_rate,global_step=model(train_X,train_Y)
    # 讀取測試集 TFRecord檔案Tensor對象
    test_X,test_Y=read_tfrecord('test_data.tfrecord',batch_size=1000)
    _,loss_test,correct_rate_test,_=model(test_X,test_Y)
    saver=tf.train.Saver(max_to_keep=2)
    with tf.Session(config=tf.ConfigProto(log_device_placement=False,allow_soft_placement=True)) as sess:
        sess.run(tf.global_variables_initializer())
        ckpt=None
        if True:
            #加載模型繼續訓練
            ckpt=tf.train.latest_checkpoint(checkpoint_dir)
            if ckpt:
                print("load model …………")
                saver.restore(sess,ckpt)
        #開啟線程
        coord = tf.train.Coordinator()
        threads = tf.train.start_queue_runners(sess=sess, coord=coord)
        #訓練
        for epoch in range(sess.run(global_step),training_epochs):
            _,loss_,correct_rate_=sess.run([train,loss,correct_rate])
            if (epoch+1)%display_time==0:
                print('step:{},loss:{},correct_rate:{}'.format(epoch+1,loss_,correct_rate_))
                loss_test_,correct_rate_test_=sess.run([loss_test,correct_rate_test])
                print('testing:loss:{},correct_rate:{}'.format(loss_test_,correct_rate_test_))
            sess.run(tf.assign(global_step, epoch + 1))
            if (epoch+1)%save_time==0:
                print('save model …………')
                saver.save(sess,'./model/model.ckpt',global_step=global_step)
            
        coord.request_stop()
        coord.join(threads)
        
if __name__=='__main__':
    train()
           

三、廣而告之

18-TFRecord 資料格式化存儲工具(CDBmax 資料國度)

當你在進行資料統計分析,模型建立遇到困難的時候,那麼請點開這個連結吧:

https://shop163287636.taobao.com/?spm=a230r.7195193.1997079397.2.b79b4e98VwGtpt

四、總結

  1.使用該方法是具有一定局限性的,因為其僅可以順序從tfRecord檔案中讀取,是以需要對樣本的資料量有一定的要求,這個要求當然是資料量越大越好,不然模型很容易過拟合。

  2.該部分完整代碼見 https://download.csdn.net/download/qq_37972530/10887231

繼續閱讀