UNet

UNet

背景介绍

  UNet:于2015年发表于MICCA,设计的就是应用于医学图像的分割,由于医学影响本身的性质,语义较为简单,结构较为固定,数据量较少且具有多模态的性质,根据CT灌注方法不同,具有不同的模态。UNet实现了使用少量数据集进行大尺寸图像的有效算法,因为结构类似U型,故称之为UNet。

UNet

UNet特点

  网络结构简单,易于实现
  使用Over-tile策略,因为医学图像处理的图像尺寸较大,我们针对于某一区域进行分割时,可以获取周围更大尺寸的信息作为上下文,在卷积时只使用有效部分,这样防止padding=same时添加无效信息。因此图像的尺寸会缩小,在网络中需要对浅层特征进行Crop之后才可以与深层特征进行Concatenate。
  使用随机弹性变形对数据进行增强,增加模型的鲁棒性。
  使用加权Loss,对于某一点到边界的距离呈高斯关系的权重,距离边界越近权重越大,距离越远权重越小。

UNet图像分析

UNet

TensorFlow2.0实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from functools import reduce
import tensorflow.keras as keras


def compose(*funcs):
if funcs:
return reduce(lambda f, g: lambda *a, **kw: g(f(*a, **kw)), funcs)
else:
raise ValueError('Composition of empty sequence not supported.')


def unet(input_shape):

input_tensor = keras.layers.Input(shape=input_shape, name='input')
x = input_tensor

x1 = compose(keras.layers.Conv2D(64, (3, 3), (1, 1), 'valid', activation='relu', name='encoder_conv1_1'),
keras.layers.Conv2D(64, (3, 3), (1, 1), 'valid', activation='relu', name='encoder_conv1_2'))(x)
x2 = compose(keras.layers.MaxPool2D((2, 2), name='encoder_maxpool1'),
keras.layers.Conv2D(128, (3, 3), (1, 1), 'valid', activation='relu', name='encoder_conv2_1'),
keras.layers.Conv2D(128, (3, 3), (1, 1), 'valid', activation='relu', name='encoder_conv2_2'))(x1)
x3 = compose(keras.layers.MaxPool2D((2, 2), name='encoder_maxpool2'),
keras.layers.Conv2D(256, (3, 3), (1, 1), 'valid', activation='relu', name='encoder_conv3_1'),
keras.layers.Conv2D(256, (3, 3), (1, 1), 'valid', activation='relu', name='encoder_conv3_2'))(x2)
x4 = compose(keras.layers.MaxPool2D((2, 2), name='encoder_maxpool3'),
keras.layers.Conv2D(512, (3, 3), (1, 1), 'valid', activation='relu', name='encoder_conv4_1'),
keras.layers.Conv2D(512, (3, 3), (1, 1), 'valid', activation='relu', name='encoder_conv4_2'))(x3)
x5 = compose(keras.layers.MaxPool2D((2, 2), name='encoder_maxpool4'),
keras.layers.Conv2D(1024, (3, 3), (1, 1), 'valid', activation='relu', name='encoder_conv5_1'),
keras.layers.Conv2D(1024, (3, 3), (1, 1), 'valid', activation='relu', name='encoder_conv5_2'))(x4)

crop1 = keras.layers.Cropping2D(((88, 88), (88, 88)), name='crop1')(x1)
crop2 = keras.layers.Cropping2D(((40, 40), (40, 40)), name='crop2')(x2)
crop3 = keras.layers.Cropping2D(((16, 16), (16, 16)), name='crop3')(x3)
crop4 = keras.layers.Cropping2D(((4, 4), (4, 4)), name='crop4')(x4)

y4 = keras.layers.UpSampling2D((2, 2), name='decoder_upsampling4')(x5)
concatenate4 = keras.layers.Concatenate(name='decoder_concatenate4')([crop4, y4])
y3 = compose(keras.layers.Conv2D(512, (3, 3), (1, 1), 'valid', activation='relu', name='decoder_conv4_1'),
keras.layers.Conv2D(512, (3, 3), (1, 1), 'valid', activation='relu', name='decoder_conv4_2'),
keras.layers.UpSampling2D((2, 2), name='decoder_upsampling3'))(concatenate4)
concatenate3 = keras.layers.Concatenate(name='decoder_concatenate3')([crop3, y3])
y2 = compose(keras.layers.Conv2D(256, (3, 3), (1, 1), 'valid', activation='relu', name='decoder_conv3_1'),
keras.layers.Conv2D(256, (3, 3), (1, 1), 'valid', activation='relu', name='decoder_conv3_2'),
keras.layers.UpSampling2D((2, 2), name='decoder_upsampling2'))(concatenate3)
concatenate4 = keras.layers.Concatenate(name='decoder_concatenate2')([crop2, y2])
y1 = compose(keras.layers.Conv2D(128, (3, 3), (1, 1), 'valid', activation='relu', name='decoder_conv2_1'),
keras.layers.Conv2D(128, (3, 3), (1, 1), 'valid', activation='relu', name='decoder_conv2_2'),
keras.layers.UpSampling2D((2, 2), name='decoder_upsampling1'))(concatenate4)
concatenate4 = keras.layers.Concatenate(name='decoder_concatenate1')([crop1, y1])
y0 = compose(keras.layers.Conv2D(64, (3, 3), (1, 1), 'valid', activation='relu', name='decoder_conv1_1'),
keras.layers.Conv2D(64, (3, 3), (1, 1), 'valid', activation='relu', name='decoder_conv1_2'))(concatenate4)

output = keras.layers.Conv2D(21, (1, 1), (1, 1), 'same', activation='softmax', name='conv_softmax')(y0)

model = keras.Model(input_tensor, output, name='UNet')

return model


if __name__ == '__main__':

model = unet(input_shape=(572, 572, 3))
model.build(input_shape=(None, 572, 572, 3))
model.summary()

UNet

Shape数据集完整实战

文件路径关系说明

  • project
    • shape
      • train_imgs(训练集图像文件夹)
      • train_mask(训练集掩模文件夹)
      • test_imgs(测试集图像文件夹)
    • UNet_weight(模型权重文件夹)
    • UNet_test_result(测试集结果文件夹)
    • UNet.py

实战步骤说明

  1. 语义分割实战运行较为简单,因为它的输入的训练数据为图像,输入的标签数据也是图像,首先要对输入的标签数据进行编码,转换为类别信息,要和网络的输出维度相匹配,从(batch_size, height, width, 1)转换为(batch_size, height, width, num_class + 1),某个像素点为哪一个类别,则在该通道上置1,其余通道置0。即神经网络的输入大小为(batch_size, height, width, 3),输出大小为(batch_size, height, width, num_class + 1)。
  2. 设计损失函数,简单情况设置交叉熵损失函数即可达到较好效果。
  3. 搭建神经网络,设置合适参数,进行训练。
  4. 预测时,需要根据神经网络的输出进行逆向解码(编码的反过程),寻找每一个像素点,哪一个通道上值最大则归为哪一个类别,即可完成实战的过程。

小技巧

  1. 设置的图像类别数为实际类别数+1,1代表背景类别,此数据集为3类,最后的通道数为4,每一个通道预测一类物体。在通道方向求Softmax,并且求出最大的索引,索引为0则代表背景,索引为1则代表圆形,索引为2则代表三角形,索引为3则代表正方形。
  2. 实际中用到的图像的尺寸一般都不是特别大,因此不需要将图像进行Crop,所以卷积的padding修改为same
  3. 损失函数使用交叉熵即可,使用加权Loss,计算量较大,而且需要计算边缘操作。
  4. 设置了权重的保存方式学习率的下降方式早停方式
  5. 使用yield关键字,产生可迭代对象,不用将所有的数据都保存下来,大大节约内存。
  6. 其中将1000个数据,分成800个训练集,100个验证集和100个测试集,小伙伴们可以自行修改。
  7. 注意其中的一些维度变换和numpytensorflow常用操作,否则在阅读代码时可能会产生一些困难。
  8. UNet的特征提取网络类似于VGG,小伙伴们可以参考特征提取网络部分内容,选择其他的网络进行特征提取,比较不同网络参数量,运行速度,最终结果之间的差异。
  9. 图像输入可以先将其归一化到0-1之间或者-1-1之间,因为网络的参数一般都比较小,所以归一化后计算方便,收敛较快。
  10. 实际的工程应用中,常常还需要对数据集进行大小调整和增强,在这里为了简单起见,没有进行复杂的操作,小伙伴们应用中要记得根据自己的需要,对图像进行resize或者padding,然后旋转对比度增强仿射运算等等操作,增加模型的鲁棒性,并且实际中的图像不一定按照顺序命名的,因此应用中也要注意图像读取的文件名。

完整实战代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import os
from functools import reduce
import numpy as np
import cv2 as cv
import tensorflow as tf
import tensorflow.keras as keras


def compose(*funcs):
if funcs:
return reduce(lambda f, g: lambda *a, **kw: g(f(*a, **kw)), funcs)
else:
raise ValueError('Composition of empty sequence not supported.')


def small_unet(input_shape):

input_tensor = keras.layers.Input(shape=input_shape, name='input')
x = input_tensor

x1 = compose(keras.layers.Conv2D(32, (3, 3), (1, 1), 'same', activation='relu', name='encoder_conv1_1'),
keras.layers.Conv2D(32, (3, 3), (1, 1), 'same', activation='relu', name='encoder_conv1_2'))(x)
x2 = compose(keras.layers.MaxPool2D((2, 2), name='encoder_maxpool1'),
keras.layers.Conv2D(32, (3, 3), (1, 1), 'same', activation='relu', name='encoder_conv2_1'),
keras.layers.Conv2D(32, (3, 3), (1, 1), 'same', activation='relu', name='encoder_conv2_2'))(x1)
x3 = compose(keras.layers.MaxPool2D((2, 2), name='encoder_maxpool2'),
keras.layers.Conv2D(64, (3, 3), (1, 1), 'same', activation='relu', name='encoder_conv3_1'),
keras.layers.Conv2D(64, (3, 3), (1, 1), 'same', activation='relu', name='encoder_conv3_2'))(x2)
x4 = compose(keras.layers.MaxPool2D((2, 2), name='encoder_maxpool3'),
keras.layers.Conv2D(128, (3, 3), (1, 1), 'same', activation='relu', name='encoder_conv4_1'),
keras.layers.Conv2D(128, (3, 3), (1, 1), 'same', activation='relu', name='encoder_conv4_2'))(x3)
x5 = compose(keras.layers.MaxPool2D((2, 2), name='encoder_maxpool4'),
keras.layers.Conv2D(256, (3, 3), (1, 1), 'same', activation='relu', name='encoder_conv5_1'),
keras.layers.Conv2D(256, (3, 3), (1, 1), 'same', activation='relu', name='encoder_conv5_2'))(x4)

y4 = keras.layers.UpSampling2D((2, 2), name='decoder_upsampling4')(x5)
concatenate4 = keras.layers.Concatenate(name='decoder_concatenate4')([x4, y4])
y3 = compose(keras.layers.Conv2D(128, (3, 3), (1, 1), 'same', activation='relu', name='decoder_conv4_1'),
keras.layers.Conv2D(128, (3, 3), (1, 1), 'same', activation='relu', name='decoder_conv4_2'),
keras.layers.UpSampling2D((2, 2), name='decoder_upsampling3'))(concatenate4)
concatenate3 = keras.layers.Concatenate(name='decoder_concatenate3')([x3, y3])
y2 = compose(keras.layers.Conv2D(64, (3, 3), (1, 1), 'same', activation='relu', name='decoder_conv3_1'),
keras.layers.Conv2D(64, (3, 3), (1, 1), 'same', activation='relu', name='decoder_conv3_2'),
keras.layers.UpSampling2D((2, 2), name='decoder_upsampling2'))(concatenate3)
concatenate4 = keras.layers.Concatenate(name='decoder_concatenate2')([x2, y2])
y1 = compose(keras.layers.Conv2D(32, (3, 3), (1, 1), 'same', activation='relu', name='decoder_conv2_1'),
keras.layers.Conv2D(32, (3, 3), (1, 1), 'same', activation='relu', name='decoder_conv2_2'),
keras.layers.UpSampling2D((2, 2), name='decoder_upsampling1'))(concatenate4)
concatenate4 = keras.layers.Concatenate(name='decoder_concatenate1')([x1, y1])
y0 = compose(keras.layers.Conv2D(32, (3, 3), (1, 1), 'same', activation='relu', name='decoder_conv1_1'),
keras.layers.Conv2D(32, (3, 3), (1, 1), 'same', activation='relu', name='decoder_conv1_2'))(concatenate4)

output = keras.layers.Conv2D(num_class, (1, 1), (1, 1), 'same', activation='softmax', name='conv_softmax')(y0)

model = keras.Model(input_tensor, output, name='Small_UNet')

return model


def generate_arrays_from_file(train_data, batch_size):
# 获取总长度
n = len(train_data)
i = 0
while 1:
X_train = []
Y_train = []
# 获取一个batch_size大小的数据
for _ in range(batch_size):
if i == 0:
np.random.shuffle(train_data)
# 从文件中读取图像
img = cv.imread(imgs_path + '\\' + str(train_data[i]) + '.jpg')
img = img / 127.5 - 1
X_train.append(img)

# 从文件中读取图像
img = cv.imread(mask_path + '\\' + str(train_data[i]) + '.png')
seg_labels = np.zeros((img_size[0], img_size[1], num_class))
for c in range(num_class):
seg_labels[:, :, c] = (img[:, :, 0] == c).astype(int)
Y_train.append(seg_labels)

# 读完一个周期后重新开始
i = (i + 1) % n
yield tf.constant(X_train), tf.constant(Y_train)


if __name__ == '__main__':
# 包括背景
num_class = 4
train_data = list(range(800))
validation_data = list(range(800, 900))
test_data = range(900, 1000)
epochs = 50
batch_size = 16
tf.random.set_seed(22)
img_size = (128, 128)
colors = [[0, 0, 0], [0, 0, 128], [0, 128, 0], [128, 0, 0]]

mask_path = r'.\shape\train_mask'
imgs_path = r'.\shape\train_imgs'
test_path = r'.\shape\test_imgs'
save_path = r'.\UNet_test_result'
weight_path = r'.\UNet_weight'

try:
os.mkdir(save_path)
except FileExistsError:
print(save_path + 'has been exist')

try:
os.mkdir(weight_path)
except FileExistsError:
print(weight_path + 'has been exist')

model = small_unet(input_shape=(img_size[0], img_size[1], 3))
model.build(input_shape=(None, img_size[0], img_size[1], 3))
model.summary()

optimizor = keras.optimizers.Adam(lr=1e-3)
lossor = keras.losses.BinaryCrossentropy()

model.compile(optimizer=optimizor, loss=lossor, metrics=['accuracy'])

# 保存的方式,3世代保存一次
checkpoint_period = keras.callbacks.ModelCheckpoint(
weight_path + '\\' + 'ep{epoch:03d}-loss{loss:.3f}-val_loss{val_loss:.3f}.h5',
monitor='val_loss',
save_weights_only=True,
save_best_only=True,
period=3
)

# 学习率下降的方式,val_loss3次不下降就下降学习率继续训练
reduce_lr = keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=3,
verbose=1
)

# 是否需要早停,当val_loss一直不下降的时候意味着模型基本训练完毕,可以停止
early_stopping = keras.callbacks.EarlyStopping(
monitor='val_loss',
min_delta=0,
patience=10,
verbose=1
)

model.fit_generator(generate_arrays_from_file(train_data, batch_size),
steps_per_epoch=max(1, len(train_data) // batch_size),
validation_data=generate_arrays_from_file(validation_data, batch_size),
validation_steps=max(1, len(validation_data) // batch_size),
epochs=epochs,
callbacks=[checkpoint_period, reduce_lr, early_stopping])

for name in test_data:
test_img_path = test_path + '\\' + str(name) + '.jpg'
save_img_path = save_path + '\\' + str(name) + '.png'
test_img = cv.imread(test_img_path)
test_img = tf.constant([test_img / 127.5 - 1])
test_mask = model.predict(test_img)
test_mask = np.reshape(test_mask, (img_size[0], img_size[1], num_class))
test_mask = np.argmax(test_mask, axis=-1)
seg_img = np.zeros((img_size[0], img_size[1], 3))
for c in range(num_class):
seg_img[:, :, 0] += ((test_mask == c) * (colors[c][0]))
seg_img[:, :, 1] += ((test_mask == c) * (colors[c][1]))
seg_img[:, :, 2] += ((test_mask == c) * (colors[c][2]))
seg_img = seg_img.astype(np.uint8)
cv.imwrite(save_img_path, seg_img)

模型运行结果

UNet

UNet小结

  UNet是一种简单的语义分割网络,在输入图像尺寸为572x572时,参数量为31M。因为其padding方式使其图像尺寸缩小,适合于大尺寸图像的分割,并且采用加权损失函数和优秀的图像增强操作,使得其在医学图像处理中有良好的表现。

-------------本文结束感谢您的阅读-------------
0%