加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
train.py 9.70 KB
一键复制 编辑 原始数据 按行查看 历史
肆十二 提交于 2022-12-05 11:52 . cls
from torchutils import *
from torchvision import datasets, models, transforms
import os.path as osp
import os
if torch.cuda.is_available():
device = torch.device('cuda:0')
else:
device = torch.device('cpu')
print(f'Using device: {device}')
# 固定随机种子,保证实验结果是可以复现的
seed = 42
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = True
data_path = "D:/upppppppppp/cls/flowers_data_split" # todo 数据集路径
# 注: 执行之前请先划分数据集
# 超参数设置
params = {
# 'model': 'vit_tiny_patch16_224', # 选择预训练模型
# 'model': 'resnet50d', # 选择预训练模型
'model': 'efficientnet_b3a', # 选择预训练模型
"img_size": 224, # 图片输入大小
"train_dir": osp.join(data_path, "train"), # todo 训练集路径
"val_dir": osp.join(data_path, "val"), # todo 验证集路径
'device': device, # 设备
'lr': 1e-3, # 学习率
'batch_size': 4, # 批次大小
'num_workers': 0, # 进程
'epochs': 10, # 轮数
"save_dir": "../checkpoints/", # todo 保存路径
"pretrained": True,
"num_classes": len(os.listdir(osp.join(data_path, "train"))), # 类别数目, 自适应获取类别数目
'weight_decay': 1e-5 # 学习率衰减
}
# 定义模型
class SELFMODEL(nn.Module):
def __init__(self, model_name=params['model'], out_features=params['num_classes'],
pretrained=True):
super().__init__()
self.model = timm.create_model(model_name, pretrained=pretrained) # 从预训练的库中加载模型
# self.model = timm.create_model(model_name, pretrained=pretrained, checkpoint_path="pretrained/resnet50d_ra2-464e36ba.pth") # 从预训练的库中加载模型
# classifier
if model_name[:3] == "res":
n_features = self.model.fc.in_features # 修改全连接层数目
self.model.fc = nn.Linear(n_features, out_features) # 修改为本任务对应的类别数目
elif model_name[:3] == "vit":
n_features = self.model.head.in_features # 修改全连接层数目
self.model.head = nn.Linear(n_features, out_features) # 修改为本任务对应的类别数目
else:
n_features = self.model.classifier.in_features
self.model.classifier = nn.Linear(n_features, out_features)
# resnet修改最后的全链接层
print(self.model) # 返回模型
def forward(self, x): # 前向传播
x = self.model(x)
return x
# 定义训练流程
def train(train_loader, model, criterion, optimizer, epoch, params):
metric_monitor = MetricMonitor() # 设置指标监视器
model.train() # 模型设置为训练模型
nBatch = len(train_loader)
stream = tqdm(train_loader)
for i, (images, target) in enumerate(stream, start=1): # 开始训练
images = images.to(params['device'], non_blocking=True) # 加载数据
target = target.to(params['device'], non_blocking=True) # 加载模型
output = model(images) # 数据送入模型进行前向传播
loss = criterion(output, target.long()) # 计算损失
f1_macro = calculate_f1_macro(output, target) # 计算f1分数
recall_macro = calculate_recall_macro(output, target) # 计算recall分数
acc = accuracy(output, target) # 计算准确率分数
metric_monitor.update('Loss', loss.item()) # 更新损失
metric_monitor.update('F1', f1_macro) # 更新f1
metric_monitor.update('Recall', recall_macro) # 更新recall
metric_monitor.update('Accuracy', acc) # 更新准确率
optimizer.zero_grad() # 清空学习率
loss.backward() # 损失反向传播
optimizer.step() # 更新优化器
lr = adjust_learning_rate(optimizer, epoch, params, i, nBatch) # 调整学习率
stream.set_description( # 更新进度条
"Epoch: {epoch}. Train. {metric_monitor}".format(
epoch=epoch,
metric_monitor=metric_monitor)
)
return metric_monitor.metrics['Accuracy']["avg"], metric_monitor.metrics['Loss']["avg"] # 返回结果
# 定义验证流程
def validate(val_loader, model, criterion, epoch, params):
metric_monitor = MetricMonitor() # 验证流程
model.eval() # 模型设置为验证格式
stream = tqdm(val_loader) # 设置进度条
with torch.no_grad(): # 开始推理
for i, (images, target) in enumerate(stream, start=1):
images = images.to(params['device'], non_blocking=True) # 读取图片
target = target.to(params['device'], non_blocking=True) # 读取标签
output = model(images) # 前向传播
loss = criterion(output, target.long()) # 计算损失
f1_macro = calculate_f1_macro(output, target) # 计算f1分数
recall_macro = calculate_recall_macro(output, target) # 计算recall分数
acc = accuracy(output, target) # 计算acc
metric_monitor.update('Loss', loss.item()) # 后面基本都是更新进度条的操作
metric_monitor.update('F1', f1_macro)
metric_monitor.update("Recall", recall_macro)
metric_monitor.update('Accuracy', acc)
stream.set_description(
"Epoch: {epoch}. Validation. {metric_monitor}".format(
epoch=epoch,
metric_monitor=metric_monitor)
)
return metric_monitor.metrics['Accuracy']["avg"], metric_monitor.metrics['Loss']["avg"]
# 展示训练过程的曲线
def show_loss_acc(acc, loss, val_acc, val_loss, sava_dir):
# 从history中提取模型训练集和验证集准确率信息和误差信息
# 按照上下结构将图画输出
plt.figure(figsize=(8, 8))
plt.subplot(2, 1, 1)
plt.plot(acc, label='Training Accuracy')
plt.plot(val_acc, label='Validation Accuracy')
plt.legend(loc='lower right')
plt.ylabel('Accuracy')
plt.ylim([min(plt.ylim()), 1])
plt.title('Training and Validation Accuracy')
plt.subplot(2, 1, 2)
plt.plot(loss, label='Training Loss')
plt.plot(val_loss, label='Validation Loss')
plt.legend(loc='upper right')
plt.ylabel('Cross Entropy')
plt.title('Training and Validation Loss')
plt.xlabel('epoch')
# 保存在savedir目录下。
save_path = osp.join(save_dir, "results.png")
plt.savefig(save_path, dpi=100)
if __name__ == '__main__':
accs = []
losss = []
val_accs = []
val_losss = []
data_transforms = get_torch_transforms(img_size=params["img_size"]) # 获取图像预处理方式
train_transforms = data_transforms['train'] # 训练集数据处理方式
valid_transforms = data_transforms['val'] # 验证集数据集处理方式
train_dataset = datasets.ImageFolder(params["train_dir"], train_transforms) # 加载训练集
valid_dataset = datasets.ImageFolder(params["val_dir"], valid_transforms) # 加载验证集
if params['pretrained'] == True:
save_dir = osp.join(params['save_dir'], params['model']+"_pretrained_" + str(params["img_size"])) # 设置模型保存路径
else:
save_dir = osp.join(params['save_dir'], params['model'] + "_nopretrained_" + str(params["img_size"])) # 设置模型保存路径
if not osp.isdir(save_dir): # 如果保存路径不存在的话就创建
os.makedirs(save_dir) #
print("save dir {} created".format(save_dir))
train_loader = DataLoader( # 按照批次加载训练集
train_dataset, batch_size=params['batch_size'], shuffle=True,
num_workers=params['num_workers'], pin_memory=True,
)
val_loader = DataLoader( # 按照批次加载验证集
valid_dataset, batch_size=params['batch_size'], shuffle=False,
num_workers=params['num_workers'], pin_memory=True,
)
print(train_dataset.classes)
model = SELFMODEL(model_name=params['model'], out_features=params['num_classes'],
pretrained=params['pretrained']) # 加载模型
# model = nn.DataParallel(model) # 模型并行化,提高模型的速度
# resnet50d_1epochs_accuracy0.50424_weights.pth
model = model.to(params['device']) # 模型部署到设备上
criterion = nn.CrossEntropyLoss().to(params['device']) # 设置损失函数
optimizer = torch.optim.AdamW(model.parameters(), lr=params['lr'], weight_decay=params['weight_decay']) # 设置优化器
# 损失函数和优化器可以自行设置修改。
# criterion = nn.CrossEntropyLoss().to(params['device']) # 设置损失函数
# optimizer = torch.optim.AdamW(model.parameters(), lr=params['lr'], weight_decay=params['weight_decay']) # 设置优化器
best_acc = 0.0 # 记录最好的准确率
# 只保存最好的那个模型。
for epoch in range(1, params['epochs'] + 1): # 开始训练
acc, loss = train(train_loader, model, criterion, optimizer, epoch, params)
val_acc, val_loss = validate(val_loader, model, criterion, epoch, params)
accs.append(acc)
losss.append(loss)
val_accs.append(val_acc)
val_losss.append(val_loss)
if val_acc >= best_acc:
# 保存的时候设置一个保存的间隔,或者就按照目前的情况,如果前面的比后面的效果好,就保存一下。
# 按照间隔保存的话得不到最好的模型。
save_path = osp.join(save_dir, f"{params['model']}_{epoch}epochs_accuracy{acc:.5f}_weights.pth")
torch.save(model.state_dict(), save_path)
best_acc = val_acc
show_loss_acc(accs, losss, val_accs, val_losss, save_dir)
print("训练已完成,模型和训练日志保存在: {}".format(save_dir))
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化