代码拉取完成,页面将自动刷新
# 解压文件 首次运行需解压,去掉注释
# !unzip -q data/data10954/cat_12_train.zip
### 导入包
import paddle.fluid as fluid
import paddle
import numpy as np
from PIL import Image
import os
import random
from multiprocessing import cpu_count
import matplotlib.pyplot as plt
image_data_path = 'cat_12_train' # 解压后的训练图片路径
label_path = 'data/data10954/train_list.txt' # 训练图片名与标签地址
train_path = 'train_data.txt' # 训练文件路径
test_path = 'test_data.txt' # 测试文件路径
## data_partition 首次运行即可
def data_partition():
'''
@brief: 将数据集划分为训练集和测试集
'''
with open(train_path, 'w') as f_train:
with open(test_path, 'w') as f_test:
with open(label_path,'r') as f:
lines = f.readlines()
del lines[len(lines)-1]
for i,line in enumerate(lines):
img_path, label = line.split('\t')
# 注意有些文件是单通道,这些是不需要利用,运行时会报错
try:
img = Image.open(img_path)
img = np.array(img).astype(np.float32)
img = img.transpose((2, 0, 1))
if i%10 == 0:
f_test.write(line)
else:
f_train.write(line)
except Exception as err:
# 打印有问题的图片
print(img_path)
print("划分完成")
# 数据预处理
crop_size = 256
resize_size = 270
def image_preprocessing(sample):
"""
@brief: 对图片进行预处理的
@param: sample 传入img,lable
@return: 返回经处理的图片
"""
img_path, label= sample
try:
img = Image.open(img_path)
# print(img_path)
# 统一图片大小
img = img.resize((resize_size, resize_size), Image.ANTIALIAS)
# 随机水平翻转
r1 = random.random()
if r1 > 0.5:
img = img.transpose(Image.FLIP_LEFT_RIGHT)
# 随机垂直翻转
r2 = random.random()
if r2 > 0.5:
img = img.transpose(Image.FLIP_TOP_BOTTOM)
# 随机角度翻转
r3 = random.randint(-3, 3)
img = img.rotate(r3, expand=False)
# 随机裁剪
r4 = random.randint(0, int(resize_size - crop_size))
r5 = random.randint(0, int(resize_size - crop_size))
box = (r4, r5, r4 + crop_size, r5 + crop_size)
img = img.crop(box)
# 把图片转换成numpy值
img = np.array(img.convert("RGB")).astype(np.float32)
# 转换成CHW
img = img.transpose((2, 0, 1))
# 转换成BGR
img = img[(2, 1, 0), :, :]
img = img.flatten().astype('float32') / 255.0
return img, int(label)
except Exception as err:
print(err)
print("%s 该图片错误,请删除该图片并重新创建图像数据列表" % img_path)
def image_preprocessing1(sample):
img, label = sample
try:
img = paddle.dataset.image.load_image(file=img, is_color=True)
img = paddle.dataset.image.simple_transform(im=img, resize_size=resize_size, crop_size=crop_size, is_color=True, is_train=True)
img = img.flatten().astype('float32') / 255.0
return img, label
except Exception as err:
print(sample)
print(err)
img, label = sample
d_img = Image.open(img).convert('RGB')
plt.savefig(img)
img = paddle.dataset.image.load_image(file=img, is_color=True)
img = paddle.dataset.image.simple_transform(im=img, resize_size=resize_size, crop_size=crop_size, is_color=True, is_train=True)
img = img.flatten().astype('float32') / 255.0
return img, label
CPU_count = cpu_count()
def train_r(train_list_path):
def reader():
with open(train_list_path, 'r') as f:
lines = f.readlines()
del lines[len(lines)-1]
for line in lines:
img, label = line.split('\t')
yield img, int(label)
return paddle.reader.xmap_readers(image_preprocessing, reader, CPU_count, 1024)
def test_r(test_list_path):
def reader():
with open(test_list_path, 'r') as f:
lines = f.readlines()
for line in lines:
img, label = line.split('\t')
#print(img)
yield img, int(label)
return paddle.reader.xmap_readers(image_preprocessing, reader, CPU_count, 1024)
BATCH_SIZE = 128
train_reader = paddle.batch(reader=paddle.reader.shuffle(reader=train_r(train_path), buf_size=128*100), batch_size=BATCH_SIZE)
test_reader = paddle.batch(reader=test_r(test_path), batch_size=BATCH_SIZE)
print("数据处理完成")
# 定义多层感知器
def multilayer_perceptron(input,type_size):
# 第一个全连接层,激活函数为ReLU
hidden1 = fluid.layers.fc(input=input, size=100, act='relu')
# 第二个全连接层,激活函数为ReLU
hidden2 = fluid.layers.fc(input=hidden1, size=100, act='relu')
hidden3 = fluid.layers.fc(input=hidden2, size=100, act='relu')
# 以softmax为激活函数的全连接输出层,输出层的大小
prediction = fluid.layers.fc(input=hidden3, size=type_size, act='softmax')
return prediction
def convolutional_neural_network(img,type_size):
# 第一个卷积-池化层 126
conv_pool_1 = fluid.nets.simple_img_conv_pool(
input=img, # 输入图像
filter_size=5, # 滤波器的大小,conv2d卷积核大小,整数或者整型列表或者整型元组。
num_filters=20, # filter 的数量。它与输出的通道相同,卷积核的数目,
pool_size=2, # 池化核大小2*2,pool2d池化层大小,整数或者整型列表或者整型元组
pool_stride=2, # 池化步长,pool2d池化层步长,整数或者整型列表或者整型元组。
act="relu") # 激活类型
conv_pool_1 = fluid.layers.batch_norm(conv_pool_1)
# 第二个卷积-池化层 61
conv_pool_2 = fluid.nets.simple_img_conv_pool(
input=conv_pool_1,
filter_size=5,
num_filters=30,
pool_size=2,
pool_stride=2,
act="relu")
conv_pool_2 = fluid.layers.batch_norm(conv_pool_2)
# 第三个卷积-池化层 29
conv_pool_3 = fluid.nets.simple_img_conv_pool(
input=conv_pool_2,
filter_size=4,
num_filters=50,
pool_size=2,
pool_stride=2,
act="relu")
conv_pool_3 = fluid.layers.batch_norm(conv_pool_3)
# 第四个卷积-池化层 13
conv_pool_4 = fluid.nets.simple_img_conv_pool(
input=conv_pool_3,
filter_size=4,
num_filters=70,
pool_size=2,
pool_stride=2,
act="relu")
conv_pool_4 = fluid.layers.batch_norm(conv_pool_4)
# 第四个卷积-池化层 29
conv_pool_5 = fluid.nets.simple_img_conv_pool(
input=conv_pool_4,
filter_size=4,
num_filters=100,
pool_size=2,
pool_stride=2,
act="relu")
# 以softmax为激活函数的全连接输出层,12类数据输出12个数字
prediction = fluid.layers.fc(input=conv_pool_5, size=type_size, act='softmax')
return prediction
#定义输入数据
data_shape = [3, crop_size, crop_size]
images = fluid.layers.data(name='images', shape=data_shape, dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
# 获取分类器,用CNN进行分类
type_size = 12
select_type = 1
predict = None
if select_type == 1:
print('使用多层感知器')
predict = multilayer_perceptron(images,type_size=type_size)
elif select_type == 2:
predict('使用CNN')
predict = convolutional_neural_network(images,type_size=type_size)
# 获取损失函数和准确率,选择交叉熵损失函数
cost = fluid.layers.cross_entropy(input=predict, label=label) # 交叉熵
avg_cost = fluid.layers.mean(cost) # 计算cost中所有元素的平均值
acc = fluid.layers.accuracy(input=predict, label=label,k=1) #使用输入和标签计算准确率
# 获取测试程序
test_program = fluid.default_main_program().clone(for_test=True)
# 定义优化方法
# optimizer =fluid.optimizer.Adam(learning_rate=1e-4,regularization=fluid.regularizer.L2DecayRegularizer(1e-4))
optimizer =fluid.optimizer.Adam(learning_rate=1e-3)
optimizer.minimize(avg_cost)
print("配置网络完成")
# 定义使用CPU还是GPU,使用CPU时use_cuda = False,使用GPU时use_cuda = True
use_cuda = True # True 需要算力,高级版
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
# 创建执行器,初始化参数
exe = fluid.Executor(place=place)
exe.run(program=fluid.default_startup_program())
##定义数据维度
feeder = fluid.DataFeeder( feed_list=[images, label],place=place)
# 画图
all_train_iter=0
all_train_iters=[]
all_train_costs=[]
all_train_accs=[]
def draw_train_process(title,iters,costs,accs,label_cost,lable_acc):
plt.title(title, fontsize=24)
plt.xlabel("iter", fontsize=20)
plt.ylabel("cost/acc", fontsize=20)
plt.plot(iters, costs,color='red',label=label_cost)
plt.plot(iters, accs,color='green',label=lable_acc)
plt.legend()
plt.grid()
plt.show()
#定义存储路径
model_save_dir = "/home/aistudio/work/cat12.inference.model"
EPOCH_NUM = 100
print('开始训练')
try:
for pass_id in range(EPOCH_NUM):
# 开始训练
for batch_id, data in enumerate(train_reader()):
train_cost,train_acc = exe.run(program=fluid.default_main_program(),#运行主程序
feed=feeder.feed(data), #喂入一个batch的数据
fetch_list=[avg_cost, acc]) #fetch均方误差和准确率
all_train_iter=all_train_iter+BATCH_SIZE
all_train_iters.append(all_train_iter)
all_train_costs.append(train_cost[0])
all_train_accs.append(train_acc[0])
#每100次batch打印一次训练、进行一次测试
if batch_id % 100 == 0:
print('Pass:%d, Batch:%d, Cost:%0.5f, Accuracy:%0.5f' %
(pass_id, batch_id, train_cost[0], train_acc[0]))
else:
print('.', end="")
# 开始测试
test_costs = [] #测试的损失值
test_accs = [] #测试的准确率
for batch_id, data in enumerate(test_reader()):
test_cost, test_acc = exe.run(program=test_program, #执行测试程序
feed=feeder.feed(data), #喂入数据
fetch_list=[avg_cost, acc]) #fetch 误差、准确率
test_costs.append(test_cost[0]) #记录每个batch的误差
test_accs.append(test_acc[0]) #记录每个batch的准确率
# 求测试结果的平均值
test_cost = (sum(test_costs) / len(test_costs)) #计算误差平均值(误差和/误差的个数)
test_acc = (sum(test_accs) / len(test_accs)) #计算准确率平均值( 准确率的和/准确率的个数)
print('Test:%d, Cost:%0.5f, ACC:%0.5f' % (pass_id, test_cost, test_acc))
#保存模型
# 如果保存路径不存在就创建
if not os.path.exists(model_save_dir):
os.makedirs(model_save_dir)
print ('save models to %s' % (model_save_dir))
fluid.io.save_inference_model(model_save_dir,
['images'],
[predict],
exe)
print('训练模型保存完成!')
draw_train_process("training",all_train_iters,all_train_costs,all_train_accs,"trainning cost","trainning acc")
except Exception as err:
print("发生异常")
print(err)
# 解压预测数据集 首次运行需解压
# !unzip -q data/data10954/cat_12_test.zip
# 模型预测
infer_exe = fluid.Executor(place)
inference_scope = fluid.core.Scope()
result_file = r'result.csv'
test_data_path = r'cat_12_test'
test_data_imgs = os.listdir(test_data_path)
# print(test_data_imgs[0])
print(os.path.join(test_data_path, test_data_imgs[0]))
def load_image(file):
#打开图片
im = Image.open(file)
#将图片调整为跟训练数据一样的大小 32*32, 设定ANTIALIAS,即抗锯齿.resize是缩放
im = im.resize((crop_size, crop_size), Image.ANTIALIAS)
#建立图片矩阵 类型为float32
im = np.array(im.convert("RGB")).astype(np.float32)
#矩阵转置
im = im.transpose((2, 0, 1))
#将像素值从【0-255】转换为【0-1】
im = im / 255.0
#print(im)
im = np.expand_dims(im, axis=0)
# 保持和之前输入image维度一致
# print('im_shape的维度:',im.shape)
return im
with fluid.scope_guard(inference_scope):
#从指定目录中加载 推理model(inference model)
[inference_program, # 预测用的program
feed_target_names, # 是一个str列表,它包含需要在推理 Program 中提供数据的变量的名称。
fetch_targets] = fluid.io.load_inference_model(model_save_dir,#fetch_targets:是一个 Variable 列表,从中我们可以得到推断结果。
infer_exe) #infer_exe: 运行 inference model的 executor
with open(result_file,'w') as f_result:
for i in range(len(test_data_imgs)):
infer_path = os.path.join(test_data_path, test_data_imgs[i])
img = load_image(infer_path)
results = infer_exe.run(inference_program, #运行预测程序
feed={feed_target_names[0]: img}, #喂入要预测的img
fetch_list=fetch_targets) #得到推测结果
# 输出样例:gN2xK8HUbjFWl1kGyCvMJehiBPwzSdOu.jpg,9
f_result.write(test_data_imgs[i]+','+ str(np.argmax(results[0]))+ '\n')
print('预测完成')
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。