import numpy as np
import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras.layers import Input, Lambda
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard, ReduceLROnPlateau, EarlyStopping
from nets.yolo4 import yolo_body
from nets.loss import yolo_loss
from utils.utils import get_random_data, get_random_data_with_Mosaic, rand, WarmUpCosineDecayScheduler, ModelCheckpoint
from functools import partial
from tqdm import tqdm
import time
import os
# 获得类和先验框
def get_classes(classes_path):
'''loads the classes'''
with open(classes_path) as f:
class_names = f.readlines()
class_names = [c.strip() for c in class_names]
return class_names
def get_anchors(anchors_path):
'''loads the anchors from a file'''
with open(anchors_path) as f:
anchors = f.readline()
anchors = [float(x) for x in anchors.split(',')]
return np.array(anchors).reshape(-1, 2)
# 训练数据生成器
def data_generator(annotation_lines, batch_size, input_shape, anchors, num_classes, mosaic=False):
'''data generator for fit_generator'''
n = len(annotation_lines)
i = 0
flag = True
while True:
image_data = []
box_data = []
for b in range(batch_size):
if i==0:
if mosaic:
if flag and (i+4) < n:
image, box = get_random_data_with_Mosaic(annotation_lines[i:i+4], input_shape)
i = (i+1) % n
image, box = get_random_data(annotation_lines[i], input_shape)
i = (i+1) % n
flag = bool(1-flag)
image, box = get_random_data(annotation_lines[i], input_shape)
i = (i+1) % n
image_data = np.array(image_data)
box_data = np.array(box_data)
y_true = preprocess_true_boxes(box_data, input_shape, anchors, num_classes)
yield image_data, y_true[0], y_true[1], y_true[2]
# 读入xml文件,并输出y_true
def preprocess_true_boxes(true_boxes, input_shape, anchors, num_classes):
assert (true_boxes[..., 4]<num_classes).all(), 'class id must be less than num_classes'
# 一共有三个特征层数
num_layers = len(anchors)//3
# 先验框
# 678为 142,110, 192,243, 459,401
# 345为 36,75, 76,55, 72,146
# 012为 12,16, 19,36, 40,28
anchor_mask = [[6,7,8], [3,4,5], [0,1,2]] if num_layers==3 else [[3,4,5], [1,2,3]]
true_boxes = np.array(true_boxes, dtype='float32')
input_shape = np.array(input_shape, dtype='int32') # 416,416
# 读出xy轴,读出长宽
# 中心点(m,n,2)
boxes_xy = (true_boxes[..., 0:2] + true_boxes[..., 2:4]) // 2
boxes_wh = true_boxes[..., 2:4] - true_boxes[..., 0:2]
# 计算比例
true_boxes[..., 0:2] = boxes_xy/input_shape[::-1]
true_boxes[..., 2:4] = boxes_wh/input_shape[::-1]
# m张图
m = true_boxes.shape[0]
# 得到网格的shape为13,13;26,26;52,52
grid_shapes = [input_shape//{0:32, 1:16, 2:8}[l] for l in range(num_layers)]
# y_true的格式为(m,13,13,3,85)(m,26,26,3,85)(m,52,52,3,85)
y_true = [np.zeros((m,grid_shapes[l][0],grid_shapes[l][1],len(anchor_mask[l]),5+num_classes),
dtype='float32') for l in range(num_layers)]
# [1,9,2]
anchors = np.expand_dims(anchors, 0)
anchor_maxes = anchors / 2.
anchor_mins = -anchor_maxes
# 长宽要大于0才有效
valid_mask = boxes_wh[..., 0]>0
for b in range(m):
# 对每一张图进行处理
wh = boxes_wh[b, valid_mask[b]]
if len(wh)==0: continue
# [n,1,2]
wh = np.expand_dims(wh, -2)
box_maxes = wh / 2.
box_mins = -box_maxes
# 计算真实框和哪个先验框最契合
intersect_mins = np.maximum(box_mins, anchor_mins)
intersect_maxes = np.minimum(box_maxes, anchor_maxes)
intersect_wh = np.maximum(intersect_maxes - intersect_mins, 0.)
intersect_area = intersect_wh[..., 0] * intersect_wh[..., 1]
box_area = wh[..., 0] * wh[..., 1]
anchor_area = anchors[..., 0] * anchors[..., 1]
iou = intersect_area / (box_area + anchor_area - intersect_area)
# 维度是(n) 感谢 消尽不死鸟 的提醒
best_anchor = np.argmax(iou, axis=-1)
for t, n in enumerate(best_anchor):
for l in range(num_layers):
if n in anchor_mask[l]:
# floor用于向下取整
i = np.floor(true_boxes[b,t,0]*grid_shapes[l][1]).astype('int32')
j = np.floor(true_boxes[b,t,1]*grid_shapes[l][0]).astype('int32')
# 找到真实框在特征层l中第b副图像对应的位置
k = anchor_mask[l].index(n)
c = true_boxes[b,t, 4].astype('int32')
y_true[l][b, j, i, k, 0:4] = true_boxes[b,t, 0:4]
y_true[l][b, j, i, k, 4] = 1
y_true[l][b, j, i, k, 5+c] = 1
return y_true
# 防止bug
def get_train_step_fn():
def train_step(imgs, yolo_loss, targets, net, optimizer, regularization):
with tf.GradientTape() as tape:
# 计算loss
P5_output, P4_output, P3_output = net(imgs, training=True)
args = [P5_output, P4_output, P3_output] + targets
loss_value = yolo_loss(args,anchors,num_classes,label_smoothing=label_smoothing)
if regularization:
# 加入正则化损失
loss_value = tf.reduce_sum(net.losses) + loss_value
grads = tape.gradient(loss_value, net.trainable_variables)
optimizer.apply_gradients(zip(grads, net.trainable_variables))
return loss_value
return train_step
def fit_one_epoch(net, yolo_loss, optimizer, epoch, epoch_size, epoch_size_val, gen, genval, Epoch, anchors,
num_classes, label_smoothing, regularization=False, train_step=None):
loss = 0
val_loss = 0
start_time = time.time()
with tqdm(total=epoch_size,desc=f'Epoch {epoch + 1}/{Epoch}',postfix=dict,mininterval=0.3) as pbar:
for iteration, batch in enumerate(gen):
if iteration>=epoch_size:
images, target0, target1, target2 = batch[0], batch[1], batch[2], batch[3]
targets = [target0, target1, target2]
targets = [tf.convert_to_tensor(target) for target in targets]
loss_value = train_step(images, yolo_loss, targets, net, optimizer, regularization)
loss = loss + loss_value.numpy()
waste_time = time.time() - start_time
pbar.set_postfix(**{'total_loss': float(loss) / (iteration + 1),
'lr' : optimizer._decayed_lr(tf.float32).numpy(),
'step/s' : waste_time})
start_time = time.time()
print('Start Validation')
with tqdm(total=epoch_size_val, desc=f'Epoch {epoch + 1}/{Epoch}',postfix=dict,mininterval=0.3) as pbar:
for iteration, batch in enumerate(genval):
if iteration>=epoch_size_val:
# 计算验证集loss
images, target0, target1, target2 = batch[0], batch[1], batch[2], batch[3]
targets = [target0, target1, target2]
targets = [tf.convert_to_tensor(target) for target in targets]
P5_output, P4_output, P3_output = net(images)
args = [P5_output, P4_output, P3_output] + targets
loss_value = yolo_loss(args,anchors,num_classes,label_smoothing=label_smoothing)
if regularization:
# 加入正则化损失
loss_value = tf.reduce_sum(net.losses) + loss_value
# 更新验证集loss
val_loss = val_loss + loss_value.numpy()
pbar.set_postfix(**{'total_loss': float(val_loss)/ (iteration + 1)})
print('Finish Validation')
print('Epoch:'+ str(epoch+1) + '/' + str(Epoch))
print('Total Loss: %.4f || Val Loss: %.4f ' % (loss/(epoch_size+1),val_loss/(epoch_size_val+1)))
gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
# 检测精度mAP和pr曲线计算参考视频
# https://www.bilibili.com/video/BV1zE411u7Vw
if __name__ == "__main__":
# 视频中说的速度慢问题已经解决了很多
# 现在train.py和train_eager.py速度差距不大
# 如果还有改进速度的地方可以私信告诉我!
# 标签的位置
annotation_path = '2007_train.txt'
# 获取classes和anchor的位置
classes_path = 'model_data/voc_classes.txt'
anchors_path = 'model_data/yolo_anchors.txt'
# 权值文件请看README,百度网盘下载
# 训练自己的数据集时提示维度不匹配正常
# 预测的东西都不一样了自然维度不匹配
weights_path = 'model_data/yolo4_weight.h5'
# 获得classes和anchor
class_names = get_classes(classes_path)
anchors = get_anchors(anchors_path)
# 一共有多少类
num_classes = len(class_names)
num_anchors = len(anchors)
# 输入的shape大小
# 显存比较小可以使用416x416
# 现存比较大可以使用608x608
input_shape = (416,416)
# tricks的使用设置
mosaic = True
Cosine_scheduler = False
label_smoothing = 0
# 是否使用正则化
regularization = True
# Dataloder的使用
Use_Data_Loader = True
# 输入的图像为
image_input = Input(shape=(None, None, 3))
h, w = input_shape
# 创建yolo模型
print('Create YOLOv4 model with {} anchors and {} classes.'.format(num_anchors, num_classes))
model_body = yolo_body(image_input, num_anchors//3, num_classes)
# 载入预训练权重
print('Load weights {}.'.format(weights_path))
model_body.load_weights(weights_path, by_name=True, skip_mismatch=True)
# 0.1用于验证,0.9用于训练
val_split = 0.1
with open(annotation_path) as f:
lines = f.readlines()
num_val = int(len(lines)*val_split)
num_train = len(lines) - num_val
# 主干特征提取网络特征通用,冻结训练可以加快训练速度
# 也可以在训练初期防止权值被破坏。
# Init_Epoch为起始世代
# Freeze_Epoch为冻结训练的世代
# Epoch总训练世代
# 提示OOM或者显存不足请调小Batch_size
freeze_layers = 302
for i in range(freeze_layers): model_body.layers[i].trainable = False
print('Freeze the first {} layers of total {} layers.'.format(freeze_layers, len(model_body.layers)))
# 调整非主干模型first
if True:
Init_epoch = 0
Freeze_epoch = 50
# batch_size大小,每次喂入多少数据
batch_size = 2
# 最大学习率
learning_rate_base = 1e-3
if Use_Data_Loader:
gen = partial(data_generator, annotation_lines = lines[:num_train], batch_size = batch_size, input_shape = input_shape,
anchors = anchors, num_classes = num_classes, mosaic=mosaic)
gen = tf.data.Dataset.from_generator(gen, (tf.float32, tf.float32, tf.float32, tf.float32))
gen_val = partial(data_generator, annotation_lines = lines[num_train:], batch_size = batch_size,
input_shape = input_shape, anchors = anchors, num_classes = num_classes, mosaic=False)
gen_val = tf.data.Dataset.from_generator(gen_val, (tf.float32, tf.float32, tf.float32, tf.float32))
gen = gen.shuffle(buffer_size=batch_size).prefetch(buffer_size=batch_size)
gen_val = gen_val.shuffle(buffer_size=batch_size).prefetch(buffer_size=batch_size)
gen = data_generator(lines[:num_train], batch_size, input_shape, anchors, num_classes, mosaic=mosaic)
gen_val = data_generator(lines[num_train:], batch_size, input_shape, anchors, num_classes, mosaic=False)
epoch_size = num_train//batch_size
epoch_size_val = num_val//batch_size
if Cosine_scheduler:
lr_schedule = tf.keras.experimental.CosineDecayRestarts(
initial_learning_rate = learning_rate_base,
first_decay_steps = 5*epoch_size,
t_mul = 1.0,
alpha = 1e-2
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
print('Train on {} samples, val on {} samples, with batch size {}.'.format(num_train, num_val, batch_size))
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
for epoch in range(Init_epoch,Freeze_epoch):
fit_one_epoch(model_body, yolo_loss, optimizer, epoch, epoch_size, epoch_size_val,gen, gen_val,
Freeze_epoch, anchors, num_classes, label_smoothing, regularization, get_train_step_fn())
for i in range(freeze_layers): model_body.layers[i].trainable = True
# 解冻后训练
if True:
Freeze_epoch = 50
Epoch = 100
# batch_size大小,每次喂入多少数据
batch_size = 2
# 最大学习率
learning_rate_base = 1e-4
if Use_Data_Loader:
gen = partial(data_generator, annotation_lines = lines[:num_train], batch_size = batch_size, input_shape = input_shape,
anchors = anchors, num_classes = num_classes, mosaic=mosaic)
gen = tf.data.Dataset.from_generator(gen, (tf.float32, tf.float32, tf.float32, tf.float32))
gen_val = partial(data_generator, annotation_lines = lines[num_train:], batch_size = batch_size,
input_shape = input_shape, anchors = anchors, num_classes = num_classes, mosaic=False)
gen_val = tf.data.Dataset.from_generator(gen_val, (tf.float32, tf.float32, tf.float32, tf.float32))
gen = gen.shuffle(buffer_size=batch_size).prefetch(buffer_size=batch_size)
gen_val = gen_val.shuffle(buffer_size=batch_size).prefetch(buffer_size=batch_size)
gen = data_generator(lines[:num_train], batch_size, input_shape, anchors, num_classes, mosaic=mosaic)
gen_val = data_generator(lines[num_train:], batch_size, input_shape, anchors, num_classes, mosaic=False)
epoch_size = num_train//batch_size
epoch_size_val = num_val//batch_size
if Cosine_scheduler:
lr_schedule = tf.keras.experimental.CosineDecayRestarts(
initial_learning_rate = learning_rate_base,
first_decay_steps = 5*epoch_size,
t_mul = 1.0,
alpha = 1e-2
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
decay_steps = epoch_size,
print('Train on {} samples, val on {} samples, with batch size {}.'.format(num_train, num_val, batch_size))
optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)
for epoch in range(Freeze_epoch,Epoch):
fit_one_epoch(model_body, yolo_loss, optimizer, epoch, epoch_size, epoch_size_val,gen, gen_val,
Epoch, anchors, num_classes, label_smoothing, regularization, get_train_step_fn())
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。