天天看點

tensorflow實作貓狗大戰(分類算法)本次使用了tensorflow進階API,在規範化網絡程式設計做出了嘗試。第一步:準備好需要的庫 第二步:準備資料集:

本次使用了tensorflow進階API,在規範化網絡程式設計做出了嘗試。

第一步:準備好需要的庫

  • tensorflow-gpu  1.8.0
  • opencv-python     3.3.1
  • numpy
  • skimage
  • tqdm

 第二步:準備資料集:

https://www.kaggle.com/c/dogs-vs-cats

我們使用了kaggle的貓狗大戰資料集

tensorflow實作貓狗大戰(分類算法)本次使用了tensorflow進階API,在規範化網絡程式設計做出了嘗試。第一步:準備好需要的庫 第二步:準備資料集:

我們可以看到資料集中,檔案名使用了  ‘類.編号.檔案類型 ’ 的标注

為了通用以及友善起見,我們對該資料集進行分檔案夾放置:

tensorflow實作貓狗大戰(分類算法)本次使用了tensorflow進階API,在規範化網絡程式設計做出了嘗試。第一步:準備好需要的庫 第二步:準備資料集:

下面是分類放置的代碼:

import os
import shutil

output_train_path = '/home/a/Datasets/cat&dog/class/cat'
output_test_path = '/home/a/Datasets/cat&dog/class/dog'

if not os.path.exists(output_train_path):
    os.makedirs(output_train_path)
if not os.path.exists(output_test_path):
    os.makedirs(output_test_path)

def scanDir_lable_File(dir,flag = True):

    if not os.path.exists(output_train_path):
        os.makedirs(output_train_path)
    if not os.path.exists(output_test_path):
        os.makedirs(output_test_path)
    for root, dirs, files in os.walk(dir, True, None, False):  # 遍列目錄
        # 處理該檔案夾下所有檔案:
        for f in files:
            if os.path.isfile(os.path.join(root, f)):
                a = os.path.splitext(f)
                # print(a)
                # lable = a[0].split('.')[1]
                lable = a[0].split('.')[0]
                print(lable)
                if lable == 'cat':
                    img_path = os.path.join(root, f)
                    mycopyfile(img_path, os.path.join(output_train_path, f))
                else:
                    img_path = os.path.join(root, f)
                    mycopyfile(img_path, os.path.join(output_test_path, f))

def mycopyfile(srcfile,dstfile):
    if not os.path.isfile(srcfile):
        print ("%s not exist!"%(srcfile))
    else:
        fpath,fname=os.path.split(dstfile)    #分離檔案名和路徑
        if not os.path.exists(fpath):
            os.makedirs(fpath)                #建立路徑
        shutil.copyfile(srcfile,dstfile)      #複制檔案
        print ("copy %s -> %s"%( srcfile,dstfile))


root_path = '/home/a/Datasets/cat&dog'
train_path = root_path+'/train/'
test_path = root_path+'/test/'
scanDir_lable_File(train_path)      

接着為了有效使用記憶體資源,我們使用tfrecord來對圖檔進行存儲

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import random
from tqdm import tqdm
import numpy as np
import tensorflow as tf
from skimage import io, transform, color, util

flags = tf.flags
flags.DEFINE_string(flag_name='directory', default_value='/home/a/Datasets/cat&dog/class', docstring='資料位址')
flags.DEFINE_string(flag_name='save_dir', default_value='./tfrecords', docstring='儲存位址')
flags.DEFINE_integer(flag_name='test_size', default_value=350, docstring='測試集大小')
FLAGS = flags.FLAGS

MODES = [tf.estimator.ModeKeys.TRAIN, tf.estimator.ModeKeys.EVAL, tf.estimator.ModeKeys.PREDICT]


def _float_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.FloatList(value=value))


def _int_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))


def _bytes_feature(value):
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))


def convert_to_tfrecord(mode, anno):
    """轉換為TfRecord"""

    assert mode in MODES, "模式錯誤"

    filename = os.path.join(FLAGS.save_dir, mode + '.tfrecords')

    with tf.python_io.TFRecordWriter(filename) as writer:
        for fnm, cls in tqdm(anno):

            # 讀取圖檔、轉換
            img = io.imread(fnm)
            img = color.rgb2gray(img)
            img = transform.resize(img, [224, 224])

            # 擷取轉換後的資訊
            if 3 == img.ndim:
                rows, cols, depth = img.shape
            else:
                rows, cols = img.shape
                depth = 1

            # 建立Example對象
            example = tf.train.Example(
                features=tf.train.Features(
                    feature={
                        'image/height': _int_feature(rows),
                        'image/width': _int_feature(cols),
                        'image/depth': _int_feature(depth),
                        'image/class/label': _int_feature(cls),
                        'image/encoded': _bytes_feature(img.astype(np.float32).tobytes())
                    }
                )
            )
            # 序列化并儲存
            writer.write(example.SerializeToString())


def get_folder_name(folder):
    """不遞歸,擷取特定檔案夾下所有檔案夾名"""

    fs = os.listdir(folder)
    fs = [x for x in fs if os.path.isdir(os.path.join(folder, x))]
    return sorted(fs)


def get_file_name(folder):
    """不遞歸,擷取特定檔案夾下所有檔案名"""

    fs = os.listdir(folder)
    fs = map(lambda x: os.path.join(folder, x), fs)
    fs = [x for x in fs if os.path.isfile(x)]
    return fs


def get_annotations(directory, classes):
    """擷取所有圖檔路徑和标簽"""

    files = []
    labels = []

    for ith, val in enumerate(classes):
        fi = get_file_name(os.path.join(directory, val))
        files.extend(fi)
        labels.extend([ith] * len(fi))

    assert len(files) == len(labels), "圖檔和标簽數量不等"

    # 将圖檔路徑和标簽拼合在一起
    annotation = [x for x in zip(files, labels)]

    # 随機打亂
    random.shuffle(annotation)

    return annotation


def main(_):
    class_names = get_folder_name(FLAGS.directory)
    annotation = get_annotations(FLAGS.directory, class_names)

    convert_to_tfrecord(tf.estimator.ModeKeys.TRAIN, annotation[FLAGS.test_size:])
    convert_to_tfrecord(tf.estimator.ModeKeys.EVAL, annotation[:FLAGS.test_size])


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    tf.app.run()      

再生成tfrecord檔案之後

我們選擇對于tfrecord檔案進行讀取

def input_fn(mode, batch_size=1):
    """輸入函數"""

    def parser(serialized_example):
        """如何處理資料集中的每一個資料"""

        # 解析單個example對象
        features = tf.parse_single_example(
            serialized_example,
            features={
                'image/height': tf.FixedLenFeature([], tf.int64),
                'image/width': tf.FixedLenFeature([], tf.int64),
                'image/depth': tf.FixedLenFeature([], tf.int64),
                'image/encoded': tf.FixedLenFeature([], tf.string),
                'image/class/label': tf.FixedLenFeature([], tf.int64),
            })

        # 擷取參數
        height = tf.cast(features['image/height'], tf.int32)
        width = tf.cast(features['image/width'], tf.int32)
        depth = tf.cast(features['image/depth'], tf.int32)

        # 還原image
        image = tf.decode_raw(features['image/encoded'], tf.float32)
        image = tf.reshape(image, [height, width, depth])
        image = image - 0.5

        # 還原label
        label = tf.cast(features['image/class/label'], tf.int32)

        return image, tf.one_hot(label, FLAGS.classes)

    if mode in MODES:
        tfrecords_file = os.path.join(FLAGS.data_dir, mode + '.tfrecords')
    else:
        raise ValueError("Mode 未知")

    assert tf.gfile.Exists(tfrecords_file), ('TFRrecords 檔案不存在')

    # 建立資料集
    dataset = tf.data.TFRecordDataset([tfrecords_file])
    # 建立映射
    dataset = dataset.map(parser, num_parallel_calls=1)
    # 設定batch
    dataset = dataset.batch(batch_size)
    # 如果是訓練,那麼就永久循環下去
    if mode == tf.estimator.ModeKeys.TRAIN:
        dataset = dataset.repeat()
    # 建立疊代器
    iterator = dataset.make_one_shot_iterator()
    # 擷取 feature 和 label
    images, labels = iterator.get_next()


    return images, labels      

接着建構自己的網絡:我們使用tf.layer來進行建構,該方法對于建構網絡十分友好。我們建立一個簡單的CNN網絡

def my_model(inputs, mode):
    """寫一個網絡"""
    net = tf.reshape(inputs, [-1, 224, 224, 1])
    net = tf.layers.conv2d(net, 32, [3, 3], padding='same', activation=tf.nn.relu)
    net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
    net = tf.layers.conv2d(net, 32, [3, 3], padding='same', activation=tf.nn.relu)
    net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
    net = tf.layers.conv2d(net, 64, [3, 3], padding='same', activation=tf.nn.relu)
    net = tf.layers.conv2d(net, 64, [3, 3], padding='same', activation=tf.nn.relu)
    net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
    # print(net)
    net = tf.reshape(net, [-1, 28 * 28 * 64])
    net = tf.layers.dense(net, 1024, activation=tf.nn.relu)
    net = tf.layers.dropout(net, 0.4, training=(mode == tf.estimator.ModeKeys.TRAIN))
    net = tf.layers.dense(net, FLAGS.classes)
    return net      

對該網絡進行操作

def my_model_fn(features, labels, mode):
    """模型函數"""

    # 可視化輸入
    tf.summary.image('images', features)

    # 建立網絡
    logits = my_model(features, mode)

    predictions = {
        'classes': tf.argmax(input=logits, axis=1),
        'probabilities': tf.nn.softmax(logits, name='softmax_tensor')
    }

    # 如果是PREDICT,那麼隻需要predictions就夠了
    if mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)

    # 建立Loss
    loss = tf.losses.softmax_cross_entropy(onehot_labels=labels, logits=logits, scope='loss')
    tf.summary.scalar('train_loss', loss)

    # 設定如何訓練
    if mode == tf.estimator.ModeKeys.TRAIN:
        optimizer = tf.train.AdamOptimizer(learning_rate=1e-3)
        train_op = optimizer.minimize(loss, tf.train.get_or_create_global_step())
    else:
        train_op = None

    # 擷取訓練精度
    accuracy = tf.metrics.accuracy(
        tf.argmax(labels, axis=1), predictions['classes'],
        name='accuracy')

    accuracy_topk = tf.metrics.mean(
        tf.nn.in_top_k(predictions['probabilities'], tf.argmax(labels, axis=1), 2),
        name='accuracy_topk')

    metrics = {
        'test_accuracy': accuracy,
        'test_accuracy_topk': accuracy_topk
    }

    # 可視化訓練精度
    tf.summary.scalar('train_accuracy', accuracy[1])
    tf.summary.scalar('train_accuracy_topk', accuracy_topk[1])

    return tf.estimator.EstimatorSpec(
        mode=mode,
        predictions=predictions,
        loss=loss,
        train_op=train_op,
        eval_metric_ops=metrics)      

訓練該網絡

def main(_):
    # 螢幕
    logging_hook = tf.train.LoggingTensorHook(
        every_n_iter=100,
        tensors={
            'accuracy': 'accuracy/value',
            'accuracy_topk': 'accuracy_topk/value',
            'loss': 'loss/value'
        },
    )

    # 建立 Estimator
    model = tf.estimator.Estimator(
        model_fn=my_model_fn,
        model_dir=FLAGS.model_dir)

    for i in range(20):
        # 訓練
        model.train(
            input_fn=lambda: input_fn(tf.estimator.ModeKeys.TRAIN, FLAGS.batch_size),
            steps=FLAGS.steps,
            hooks=[logging_hook])

        # 測試并輸出結果
        print("=" * 10, "Testing", "=" * 10)
        eval_results = model.evaluate(
            input_fn=lambda: input_fn(tf.estimator.ModeKeys.EVAL))
        print('Evaluation results:\n\t{}'.format(eval_results))
        print("=" * 30)


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    tf.app.run()      

下面是main的總體代碼:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import tensorflow as tf

flags = tf.app.flags
flags.DEFINE_integer(flag_name='batch_size', default_value=16, docstring='Batch 大小')
flags.DEFINE_string(flag_name='data_dir', default_value='./tfrecords', docstring='資料存放位置')
flags.DEFINE_string(flag_name='model_dir', default_value='./cat&dog_model', docstring='模型存放位置')
flags.DEFINE_integer(flag_name='steps', default_value=1000, docstring='訓練步數')
flags.DEFINE_integer(flag_name='classes', default_value=2, docstring='類别數量')
FLAGS = flags.FLAGS

MODES = [tf.estimator.ModeKeys.TRAIN, tf.estimator.ModeKeys.EVAL, tf.estimator.ModeKeys.PREDICT]


def input_fn(mode, batch_size=1):
    """輸入函數"""

    def parser(serialized_example):
        """如何處理資料集中的每一個資料"""

        # 解析單個example對象
        features = tf.parse_single_example(
            serialized_example,
            features={
                'image/height': tf.FixedLenFeature([], tf.int64),
                'image/width': tf.FixedLenFeature([], tf.int64),
                'image/depth': tf.FixedLenFeature([], tf.int64),
                'image/encoded': tf.FixedLenFeature([], tf.string),
                'image/class/label': tf.FixedLenFeature([], tf.int64),
            })

        # 擷取參數
        height = tf.cast(features['image/height'], tf.int32)
        width = tf.cast(features['image/width'], tf.int32)
        depth = tf.cast(features['image/depth'], tf.int32)

        # 還原image
        image = tf.decode_raw(features['image/encoded'], tf.float32)
        image = tf.reshape(image, [height, width, depth])
        image = image - 0.5

        # 還原label
        label = tf.cast(features['image/class/label'], tf.int32)

        return image, tf.one_hot(label, FLAGS.classes)

    if mode in MODES:
        tfrecords_file = os.path.join(FLAGS.data_dir, mode + '.tfrecords')
    else:
        raise ValueError("Mode 未知")

    assert tf.gfile.Exists(tfrecords_file), ('TFRrecords 檔案不存在')

    # 建立資料集
    dataset = tf.data.TFRecordDataset([tfrecords_file])
    # 建立映射
    dataset = dataset.map(parser, num_parallel_calls=1)
    # 設定batch
    dataset = dataset.batch(batch_size)
    # 如果是訓練,那麼就永久循環下去
    if mode == tf.estimator.ModeKeys.TRAIN:
        dataset = dataset.repeat()
    # 建立疊代器
    iterator = dataset.make_one_shot_iterator()
    # 擷取 feature 和 label
    images, labels = iterator.get_next()


    return images, labels


def my_model(inputs, mode):
    """寫一個網絡"""
    net = tf.reshape(inputs, [-1, 224, 224, 1])
    net = tf.layers.conv2d(net, 32, [3, 3], padding='same', activation=tf.nn.relu)
    net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
    net = tf.layers.conv2d(net, 32, [3, 3], padding='same', activation=tf.nn.relu)
    net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
    net = tf.layers.conv2d(net, 64, [3, 3], padding='same', activation=tf.nn.relu)
    net = tf.layers.conv2d(net, 64, [3, 3], padding='same', activation=tf.nn.relu)
    net = tf.layers.max_pooling2d(net, [2, 2], strides=2)
    # print(net)
    net = tf.reshape(net, [-1, 28 * 28 * 64])
    net = tf.layers.dense(net, 1024, activation=tf.nn.relu)
    net = tf.layers.dropout(net, 0.4, training=(mode == tf.estimator.ModeKeys.TRAIN))
    net = tf.layers.dense(net, FLAGS.classes)
    return net


def my_model_fn(features, labels, mode):
    """模型函數"""

    # 可視化輸入
    tf.summary.image('images', features)

    # 建立網絡
    logits = my_model(features, mode)

    predictions = {
        'classes': tf.argmax(input=logits, axis=1),
        'probabilities': tf.nn.softmax(logits, name='softmax_tensor')
    }

    # 如果是PREDICT,那麼隻需要predictions就夠了
    if mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)

    # 建立Loss
    loss = tf.losses.softmax_cross_entropy(onehot_labels=labels, logits=logits, scope='loss')
    tf.summary.scalar('train_loss', loss)

    # 設定如何訓練
    if mode == tf.estimator.ModeKeys.TRAIN:
        optimizer = tf.train.AdamOptimizer(learning_rate=1e-3)
        train_op = optimizer.minimize(loss, tf.train.get_or_create_global_step())
    else:
        train_op = None

    # 擷取訓練精度
    accuracy = tf.metrics.accuracy(
        tf.argmax(labels, axis=1), predictions['classes'],
        name='accuracy')

    accuracy_topk = tf.metrics.mean(
        tf.nn.in_top_k(predictions['probabilities'], tf.argmax(labels, axis=1), 2),
        name='accuracy_topk')

    metrics = {
        'test_accuracy': accuracy,
        'test_accuracy_topk': accuracy_topk
    }

    # 可視化訓練精度
    tf.summary.scalar('train_accuracy', accuracy[1])
    tf.summary.scalar('train_accuracy_topk', accuracy_topk[1])

    return tf.estimator.EstimatorSpec(
        mode=mode,
        predictions=predictions,
        loss=loss,
        train_op=train_op,
        eval_metric_ops=metrics)


def main(_):
    # 螢幕
    logging_hook = tf.train.LoggingTensorHook(
        every_n_iter=100,
        tensors={
            'accuracy': 'accuracy/value',
            'accuracy_topk': 'accuracy_topk/value',
            'loss': 'loss/value'
        },
    )

    # 建立 Estimator
    model = tf.estimator.Estimator(
        model_fn=my_model_fn,
        model_dir=FLAGS.model_dir)

    for i in range(20):
        # 訓練
        model.train(
            input_fn=lambda: input_fn(tf.estimator.ModeKeys.TRAIN, FLAGS.batch_size),
            steps=FLAGS.steps,
            hooks=[logging_hook])

        # 測試并輸出結果
        print("=" * 10, "Testing", "=" * 10)
        eval_results = model.evaluate(
            input_fn=lambda: input_fn(tf.estimator.ModeKeys.EVAL))
        print('Evaluation results:\n\t{}'.format(eval_results))
        print("=" * 30)


if __name__ == '__main__':
    tf.logging.set_verbosity(tf.logging.INFO)
    tf.app.run()      

在訓練完成後,我們對結果進行預測:

"""Run inference a DeepLab v3 model using tf.estimator API."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import os
import sys
import tensorflow as tf
import train
from skimage import io, transform, color, util

mode = tf.estimator.ModeKeys.PREDICT
_NUM_CLASSES = 2
image_size = [224,224]
image_files = '/home/a/Datasets/cat&dog/test/44.jpg'
model_dir = './cat&dog_model/'
def main(unused_argv):
  # Using the Winograd non-fused algorithms provides a small performance boost.
  os.environ['TF_ENABLE_WINOGRAD_NONFUSED'] = '1'
  #
  model = tf.estimator.Estimator(
      model_fn=train.my_model_fn,
      model_dir=model_dir)

  def predict_input_fn(image_path):
      img = io.imread(image_path)
      img = color.rgb2gray(img)
      img = transform.resize(img, [224, 224])
      image = img - 0.5
      # preprocess image: scale pixel values from 0-255 to 0-1
      images = tf.image.convert_image_dtype(image, dtype=tf.float32)
      dataset = tf.data.Dataset.from_tensors((images,))
      return dataset.batch(1).make_one_shot_iterator().get_next()

  def predict(image_path):

      result = model.predict(input_fn=lambda: predict_input_fn(image_path=image_path))
      for r in result:
          print(r)
          if r['classes'] ==1:
              print('dog',r['probabilities'][1])
          else:
              print('cat',r['probabilities'][0])


  predict(image_files)



if __name__ == '__main__':
  tf.logging.set_verbosity(tf.logging.INFO)
  tf.app.run(main=main)      
tensorflow實作貓狗大戰(分類算法)本次使用了tensorflow進階API,在規範化網絡程式設計做出了嘗試。第一步:準備好需要的庫 第二步:準備資料集:
tensorflow實作貓狗大戰(分類算法)本次使用了tensorflow進階API,在規範化網絡程式設計做出了嘗試。第一步:準備好需要的庫 第二步:準備資料集:

因為網絡非常簡單,是以測試精度大概在75%左右

tensorflow實作貓狗大戰(分類算法)本次使用了tensorflow進階API,在規範化網絡程式設計做出了嘗試。第一步:準備好需要的庫 第二步:準備資料集:

這個是最終網絡圖:

tensorflow實作貓狗大戰(分類算法)本次使用了tensorflow進階API,在規範化網絡程式設計做出了嘗試。第一步:準備好需要的庫 第二步:準備資料集: