Tensorflow卷積網絡實作對CIFAR圖像的分類
- CIFAR資料集簡介
- 下載下傳資料集
- 導入資料集
- 顯示資料集資訊
- 資料預處理
-
- 圖像資料預處理
- 标簽資料預處理——獨熱編碼
- 定義共享參數
- 定義網絡結構
- 構模組化型
- 定義準确率
- 定義傳回下一個epoch的函數
- 訓練模型
- 損失(準确率)可視化
CIFAR資料集簡介
該資料集共有60000張彩色圖像,這些圖像是32*32,分為10個類,每類6000張圖。這裡面有50000張用于訓練,構成了5個訓練批,每一批10000張圖;另外10000用于測試,單獨構成一批。測試批的資料裡,取自10類中的每一類,每一類随機取1000張。抽剩下的就随機排列組成了訓練批。注意一個訓練批中的各類圖像并不一定數量相同,總的來看訓練批,每一類都有5000張圖。
下面這幅圖就是列舉了10各類,每一類展示了随機的10張圖檔:
下載下傳資料集
import urllib.request
import os
import tarfile
import tensorflow as tf
# 建立檔案夾
filepath = 'data/'
if not os.path.exists(filepath):
os.mkdir(filepath)
print('建立成功')
else:
print("檔案夾已存在")
# 下載下傳資料檔案
url = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'
filepath = 'data/cifar-10-python.tar.gz'
if not os.path.isfile(filepath):
result = urllib.request.urlretrieve(url, filepath)
print('downlanded: ',result)
else:
print('Data file already exists')
# 解壓資料檔案
if not os.path.exists("data/cifar-10-batches-py"):
tflie = tarfile.open("data/cifar-10-python.tar.gz","r:gz")
result = tflie.extractall('data/')
print('Extracted to ./data/cifar-10-batches-py')
print('Finished it')
else:
print('Directory already exists')
導入資料集
import os
import numpy as np
import pickle as p
def load_CIFAR_batch(filename):
with open(filename,'rb')as f:
data_dict = p.load(f, encoding='bytes')
images = data_dict[b'data']
labels = data_dict[b'labels']
# 把原始資料結構調整為:BCWH
images = images.reshape(10000,3,32,32)
# tensorflow處理圖像資料的結構:BWHC
# 把通道資料C移動到最後一個次元
images = images.transpose(0,2,3,1)
labels = np.array(labels)
return images,labels
def load_CIFAR_data(data_dir):
images_train = []
labels_train = []
for i in range(5):
f = os.path.join(data_dir,'data_batch_%d'%(i+1))
print('loading ',f)
# 調用load_CIFAR_batch()獲得批量的圖像及其對應的标簽
image_batch,label_batch = load_CIFAR_batch(f)
images_train.append(image_batch)
labels_train.append(label_batch)
Xtrain = np.concatenate(images_train)
Ytrain = np.concatenate(labels_train)
del image_batch,label_batch
Xtest,Ytest = load_CIFAR_batch(os.path.join(data_dir,'test_batch'))
print('finish loadding CIFAR-10 data')
# 傳回訓練集的圖像和标簽,測試集的圖像和标簽
return Xtrain,Ytrain,Xtest,Ytest
data_dir = 'data/cifar-10-batches-py'
Xtrain,Ytrain,Xtest,Ytest = load_CIFAR_data(data_dir)
顯示資料集資訊
print('training data shape:',Xtrain.shape)
print('training labels shape:',Ytrain.shape)
print('test data shape:',Xtest.shape)
print('test labels shape:',Ytest.shape)
得到如下結果:
資料預處理
圖像資料預處理
# 檢視圖像資料資訊
# 顯示第一個圖的第一個像素點
print(Xtrain[0][0][0])
# 歸一化
Xtrain_normalize = Xtrain.astype('float32')/255.0
Xtest_normalize = Xtest.astype('float32')/255.0
print(Xtrain_normalize[0][0][0])
标簽資料預處理——獨熱編碼
from sklearn.preprocessing import OneHotEncoder
encoder = OneHotEncoder(sparse=False,categories='auto')
yy = [[0],[1],[2],[3],[4],[5],[6],[7],[8],[9]]
encoder.fit(yy)
# 轉置
Ytrain_reshpe = Ytrain.reshape(-1,1)
Ytest_reshpe = Ytest.reshape(-1,1)
# 獨熱編碼
Ytrain_onehot = encoder.transform(Ytrain_reshpe)
Ytest_onehot = encoder.transform(Ytest_reshpe)
定義共享參數
# 定義權值
def weight(shape):
return tf.Variable(tf.truncated_normal(shape,stddev=0.1),name='W')
# 定義偏置項
def bias(shape):
return tf.Variable(tf.constant(0.1,shape=shape),name='b')
# 定義卷積操作
# 步長為1,paddding為'SAME',即卷積後圖檔大小不變化
def conv2d(x,W):
return tf.nn.conv2d(x,W,strides=[1,1,1,1],padding='SAME')
# 定義池化操作
# 步長為2,即原尺寸的長和寬除以2
def max_pool(x):
return tf.nn.max_pool(x,ksize=[1,2,2,1],strides=[1,2,2,1],padding='SAME')
定義網絡結構
# 輸入層
# 32x32圖像,通道為3(RGB)
# 用tf.name_scope('inputt_layer')将下面結構打包成input_layer
with tf.name_scope('inputt_layer'):
x = tf.placeholder('float',shape=[None,32,32,3],name='X')
# 第一個卷積層
# 輸入通道:3,輸出通道:32,卷積後圖像尺寸不變,依然是32x32
with tf.name_scope('conv_1'):
W1 = weight([3,3,3,32]) # [k_wight,k_height,input_chn,output_chn]
b1 = bias([32]) # 與output_chn一緻
conv_1 = tf.nn.relu(conv2d(x,W1)+b1)
# 第一個池化層
# 将32x32的圖像縮小為16x16,不改變通道數
with tf.name_scope('pool_1'):
pool_1 = max_pool(conv_1)
# 第二個卷積層
# 輸入通道:32,輸出通道:64,卷積後圖像尺寸不變,依然是32x32
with tf.name_scope('conv_2'):
W2 = weight([3,3,32,64]) # [k_wight,k_height,input_chn,output_chn]
b2 = bias([64]) # 與output_chn一緻
conv_2 = tf.nn.relu(conv2d(pool_1,W2)+b2)
# 第二個池化層
# 将16x16的圖像縮小為8x8,不改變通道數
with tf.name_scope('pool_2'):
pool_2 = max_pool(conv_2)
# 全連接配接層
# 将64個8x8的圖像轉換為一維的向量,長度是64*8*8=4096
# 158個神經元
with tf.name_scope('fc'):
W3 = weight([4096,158])
b3 = bias([158])
flat = tf.reshape(pool_2,[-1,4096])# 将pool_2展開為1行4096列的向量
h = tf.nn.relu(tf.matmul(flat,W3)+b3)
# 使用dropout方法使20%的神經元消失,防止神經網絡過拟合
h_dropout = tf.nn.dropout(h,keep_prob=0.8)
# 輸出層
# 輸出層有10個神經元,對應10個類别
with tf.name_scope('output_layer'):
W4 = weight([158,10])
b4 = bias([10])
pred = tf.nn.softmax(tf.matmul(h_dropout,W4)+b4)
構模組化型
with tf.name_scope('optimizer'):
# 定義占位符
y = tf.placeholder('float',shape=[None,10],name='label')
# 定義損失函數
loss_fun = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits
(logits=pred,labels=y))
# 選擇優化器
lr = 0.0001
opt = tf.train.AdamOptimizer(lr).minimize(loss_fun)
定義準确率
with tf.name_scope('evaluation'):
# tf.argmax(x,1)傳回x中每一行最大值的位置
correct_prediction = tf.equal(tf.argmax(pred,1),tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction,'float'))
其中
tf.argmax(x,1)
傳回x中每一行最大值的位置。
定義傳回下一個epoch的函數
def next_epoch(num, batch_size):
index = np.random.randint(0,num-1,batch_size)
batch_x = Xtrain_normalize[index]
batch_y = Ytrain_onehot[index]
return batch_x,batch_y
訓練模型
import os
from time import time
train_epochs = 50
batch_size = 50
total_batch = int(len(Xtrain)/batch_size)
display_step = 1
epoch_list = []
accuracy_list = []
loss_list = []
# epoch = tf.Variable(0,name='epoch',trainable=False)
startTime = time()
init = tf.global_variables_initializer()
with tf.Session() as sess:
sess.run(init)
for epoch in range(train_epochs):
for i in range(total_batch):
batch_x, batch_y = next_epoch(50000,batch_size)
sess.run(opt,feed_dict={x:batch_x,y:batch_y})
loss,acc = sess.run([loss_fun,accuracy],feed_dict={x:batch_x,y:batch_y})
if epoch%display_step == 0:
print("Train Epoch:%02d" % (epoch+1),"Loss=%f" % loss, "Accuracy=%.4f" % acc)
epoch_list.append(ep+1)
loss_list.append(loss)
accuracy_list.append(acc)
time_cost = time() - startTime
print("Train finished takes:%.4f" % time_cost)
損失(準确率)可視化
%matplotlib inline
import matplotlib.pyplot as plt
fig = plt.gcf()
fig.set_size_inches(4,2)
plt.plot(loss_list,label='loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.title('Loss')
plt.legend(['loss'],loc='upper right')