本筆記基于tensorflow-2版本,先貢獻代碼或者下載下傳代碼(可能需要科學上網)。
什麼是圖像分割
在圖像分類任務中,網絡為每個輸入圖像配置設定一個标簽(或類别)。然而,假設你想知道該物體的形狀,哪個像素屬于哪個物體,等等。在這種情況下,你會想給圖像的每個像素配置設定一個類别。這項任務被稱為分割。一個分割模型會傳回關于圖像的更詳細的資訊。圖像分割在醫學成像、自動駕駛汽車和衛星成像方面有許多應用,舉例說明一下。
這裡有一個資料集,該資料集由37個寵物品種的圖像組成,每個品種有200張圖像(在訓練和測試部分各約100張)。每張圖檔都包括相應的标簽和像素級的掩碼。掩碼是每個像素的類别标簽。每個像素被賦予三個類别中的一個:
- 第一類:表現寵物的像素
- 第二類:與寵物邊緣接壤的像素
-
第三類:以上都不是的像素
該例子可以通過下面的程式安裝:
pip install git+https://github.com/tensorflow/examples.git
如果無法連結GitHub或者連結網速很慢,可以通過這個鏡像進行安裝,同樣其它任何GitHub的包都可以使用這個鏡像進行下載下傳或者安裝:
pip install git+https://github.com.cnpmjs.org/tensorflow/examples.git
導入TensorFlow相關包:
import tensorflow as tf
from tensorflow.keras.layers.experimental import preprocessing
import tensorflow_datasets as tfds
from tensorflow_examples.models.pix2pix import pix2pix
from IPython.display import clear_output
import matplotlib.pyplot as plt
下載下傳Oxford-IIIT寵物資料(下載下傳大小773.52M,資料集大小774.69M):
也可以手動下載下傳。
圖檔展示:
此外,圖像顔色值被歸一化為[0,1]範圍。最後,如上所述,分割中的像素被标記為{1,2,3}。為了友善起見,将标記減去1,得出的标簽是: {0, 1, 2}:
def normalize(input_image, input_mask):
input_image = tf.cast(input_image, tf.float32) / 255.0
input_mask -= 1
return input_image, input_mask
整理:
def load_image(datapoint):
input_image = tf.image.resize(datapoint['image'], (128, 128))
input_mask = tf.image.resize(datapoint['segmentation_mask'], (128, 128))
input_image, input_mask = normalize(input_image, input_mask)
return input_image, input_mask
資料集已經被分割成測試集和訓練集,是以不需要對其再分割,直接使用即可:
TRAIN_LENGTH = info.splits['train'].num_examples
BATCH_SIZE = 64
BUFFER_SIZE = 1000
STEPS_PER_EPOCH = TRAIN_LENGTH // BATCH_SIZE
下面的類執行了一個簡單的擴增,即對一個圖像進行随機翻轉操作:
class Augment(tf.keras.layers.Layer):
def __init__(self, seed=42):
super().__init__()
# both use the same seed, so they'll make the same randomn changes.
self.augment_inputs = preprocessing.RandomFlip(mode="horizontal", seed=seed)
self.augment_labels = preprocessing.RandomFlip(mode="horizontal", seed=seed)
def call(self, inputs, labels):
inputs = self.augment_inputs(inputs)
labels = self.augment_labels(labels)
return inputs, labels
建構輸入的pipeline,在對輸入進行批處理後然後使用Augmentation:
train_batches = (
train_images
.cache()
.shuffle(BUFFER_SIZE)
.batch(BATCH_SIZE)
.repeat()
.map(Augment())
.prefetch(buffer_size=tf.data.AUTOTUNE))
test_batches = test_images.batch(BATCH_SIZE)
圖像例子和它的MASK圖:
def display(display_list):
plt.figure(figsize=(15, 15))
title = ['Input Image', 'True Mask', 'Predicted Mask']
for i in range(len(display_list)):
plt.subplot(1, len(display_list), i+1)
plt.title(title[i])
plt.imshow(tf.keras.preprocessing.image.array_to_img(display_list[i]))
plt.axis('off')
plt.show()
for images, masks in train_batches.take(2):
sample_image, sample_mask = images[0], masks[0]
display([sample_image, sample_mask])
下面定義Unet模型
我們這裡使用的是改進的unet網絡模型。一個unet由一個編碼器(下采樣器)和解碼器(上采樣器)組成。為了更穩健的學習到圖像特征并減少可訓練參數的數量,這裡将使用一個預訓練的模型 MobileNetV2作為編碼器。對于解碼器,可以使用上采樣子產品,該子產品已經在TensorFlow執行個體庫中的Pix2pix中實作了。
編碼器是訓練好的MobileNetV2模型,直接調用tf.keras.applications即可。編碼器由模型中間層的特定輸出組成,在訓練過程中不會對編碼器進行訓練。
base_model = tf.keras.applications.MobileNetV2(input_shape=[128, 128, 3], include_top=False)
# Use the activations of these layers
layer_names = [
'block_1_expand_relu', # 64x64
'block_3_expand_relu', # 32x32
'block_6_expand_relu', # 16x16
'block_13_expand_relu', # 8x8
'block_16_project', # 4x4
]
base_model_outputs = [base_model.get_layer(name).output for name in layer_names]
# Create the feature extraction model
down_stack = tf.keras.Model(inputs=base_model.input, outputs=base_model_outputs)
down_stack.trainable = False
解碼器/上采樣隻是在TensorFlow中實作的一系列上采樣子產品。
up_stack = [
pix2pix.upsample(512, 3), # 4x4 -> 8x8
pix2pix.upsample(256, 3), # 8x8 -> 16x16
pix2pix.upsample(128, 3), # 16x16 -> 32x32
pix2pix.upsample(64, 3), # 32x32 -> 64x64
]
def unet_model(output_channels:int):
inputs = tf.keras.layers.Input(shape=[128, 128, 3])
# Downsampling through the model
skips = down_stack(inputs)
x = skips[-1]
skips = reversed(skips[:-1])
# Upsampling and establishing the skip connections
for up, skip in zip(up_stack, skips):
x = up(x)
concat = tf.keras.layers.Concatenate()
x = concat([x, skip])
# This is the last layer of the model
last = tf.keras.layers.Conv2DTranspose(
filters=output_channels, kernel_size=3, strides=2,
padding='same') #64x64 -> 128x128
x = last(x)
return tf.keras.Model(inputs=inputs, outputs=x)
注意:上一層過濾器數量設定為輸出通道數量,這也是該層的輸出通道數。
訓練模型
這個是一個多分類問題,采用CetegforicalCrossentropy(from_logits=True)作為标準的損失函數,即使用 losses.SparseCategoricalCrossentropy(from_logits=True),原因是因為标簽是标量的,而不是每個類的分數向量。
OUTPUT_CLASSES = 3
model = unet_model(output_channels=OUTPUT_CLASSES)
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
模型檢測:
訓練之前測試一下模型:
def create_mask(pred_mask):
pred_mask = tf.argmax(pred_mask, axis=-1)
pred_mask = pred_mask[..., tf.newaxis]
return pred_mask[0]
def show_predictions(dataset=None, num=1):
if dataset:
for image, mask in dataset.take(num):
pred_mask = model.predict(image)
display([image[0], mask[0], create_mask(pred_mask)])
else:
display([sample_image, sample_mask,
create_mask(model.predict(sample_image[tf.newaxis, ...]))])
輸出:
下面定義回報,使得在訓練過程中提高精确度:
class DisplayCallback(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
clear_output(wait=True)
show_predictions()
print ('\nSample Prediction after epoch {}\n'.format(epoch+1))
EPOCHS = 20
VAL_SUBSPLITS = 5
VALIDATION_STEPS = info.splits['test'].num_examples//BATCH_SIZE//VAL_SUBSPLITS
model_history = model.fit(train_batches, epochs=EPOCHS,
steps_per_epoch=STEPS_PER_EPOCH,
validation_steps=VALIDATION_STEPS,
validation_data=test_batches,
callbacks=[DisplayCallback()])
損失函數:
loss = model_history.history['loss']
val_loss = model_history.history['val_loss']
plt.figure()
plt.plot(model_history.epoch, loss, 'r', label='Training loss')
plt.plot(model_history.epoch, val_loss, 'bo', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Value')
plt.ylim([0, 1])
plt.legend()
plt.show()
預測:
語義分割資料集可能是高度不平衡的,這就意味着特定類别的像素可以比其他像素的權重更高。由于分割問題可以使用像素級别問題來處理,我們就可以通過權重損失函數來處理不平衡問題。可以參考這裡。
model.fit目前不支援3+次元的資料:
try:
model_history = model.fit(train_batches, epochs=EPOCHS,
steps_per_epoch=STEPS_PER_EPOCH,
class_weight = {0:2.0, 1:2.0, 2:1.0})
assert False
except Exception as e:
print(f"{type(e).__name__}: {e}")
報錯:
是以我們需要自己實作權重。可以參考樣本權重:model.fit可以接受(data, label)格式之外,還接受(data, label, sample_weight)三維樣式。
model.fit可以将sample_weight傳給損失函數和矩陣, 然後樣本權重會乘以樣本。例如:
label = [0,0]
prediction = [[-3., 0], [-3, 0]]
sample_weight = [1, 10]
loss = tf.losses.SparseCategoricalCrossentropy(from_logits=True,
reduction=tf.losses.Reduction.NONE)
loss(label, prediction, sample_weight).numpy()
是以為了生成樣本權重,我們需要一個函數,這個函數輸入(data, label),然後輸出(data, label, sample_weight),其中樣本權重包含每個像素的權重。
最簡單的是将标簽作為權重清單的索引:
def add_sample_weights(image, label):
# The weights for each class, with the constraint that:
# sum(class_weights) == 1.0
class_weights = tf.constant([2.0, 2.0, 1.0])
class_weights = class_weights/tf.reduce_sum(class_weights)
# Create an image of `sample_weights` by using the label at each pixel as an
# index into the `class weights` .
sample_weights = tf.gather(class_weights, indices=tf.cast(label, tf.int32))
return image, label, sample_weights
生成的資料集每個成分都包含了三個元素:
train_batches.map(add_sample_weights).element_spec
現在我們就可以在權重資料集上進行訓練模型:
weighted_model = unet_model(OUTPUT_CLASSES)
weighted_model.compile(
optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
weighted_model.fit(
train_batches.map(add_sample_weights),
epochs=1,
steps_per_epoch=10)