加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
testdemo.py 20.45 KB
一键复制 编辑 原始数据 按行查看 历史
咭咭咭熊 提交于 2021-10-20 20:17 . Image Caption代码学习
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
# Bahdanau的注意力或本地注意力
# 1.导入所需的库
import string
import numpy as np
import pandas as pd
from numpy import array
from pickle import load
from PIL import Image
import pickle# 序列化库
from collections import Counter
import matplotlib.pyplot as plt
import sys,time,os,warnings
warnings.filterwarnings("ignore")
import re
import keras
import tensorflow as tf
from tqdm import tqdm
from nltk.translate.bleu_score import sentence_bleu
from keras.preprocessing.sequence import pad_sequences
from keras.preprocessing.image import load_img, img_to_array
# from keras.utils import to_categorical
# from keras.utils import plot_model
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense,BatchNormalization
from keras.layers import LSTM
from keras.layers import Embedding
from keras.layers import Dropout
from keras.preprocessing.text import Tokenizer
from keras.applications.vgg16 import VGG16,preprocess_input
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
# Bleu评估方法:用于分析待评估的翻译语句与参考翻译语句之n-gram的相关性
# 编码器-解码器图像字幕系统:使用产生隐藏状态的预训练卷积神经网络对图像进行编码;
# 然后使用LSTM解码此隐藏状态生成标题
# 1)在Bahdanau或本地关注中,关注仅放在少数几个来源位置
# 2)本地注意力——只关注每个目标词的编码器隐藏状态的一小部分
# 3)全局注意力——集中于所有目标词的所有来源
# 4)局部注意力——首先找到对其位置,然后在对其位置所在的左右窗口中计算注意力权重,最后对上下文向量进行加权;【减少了注意力机制计算的成本】
# 在计算中,本地注意力不是考虑源语言端的所有单词,而是根据预测函数预测在当前解码时要对其的源语言端的位置,然后在窗口中导航,仅考虑窗口中的单词
# 2.数据加载和预处理
# (1)定义图像和字幕路径,并检查数据集中总共有多少图像
image_path = "D:\PyCharm2018\PyCharmProject\TestImgCap\data\Flicker8k_Dataset"
dir_Flickr_text = "D:\PyCharm2018\PyCharmProject\TestImgCap\data\Flickr8k_text\Flickr8k.token.txt"
jpgs = os.listdir(image_path)
# (2) 以表格的形式来存储图像ID和标题
file = open(dir_Flickr_text, 'r')
text = file.read()
file.close()
datatxt = []
for line in text.split('\n'):
col = line.split('\t')
if len(col) == 1:
continue
w = col[0].split("#")
datatxt.append(w + [col[1].lower()])
data = pd.DataFrame(datatxt, columns=["filename", "index", "caption"])
data = data.reindex(columns=['index', 'filename', 'caption'])
data = data[data.filename != '2258277193_586949ec62.jpg.1']
uni_filenames = np.unique(data.filename.values)
# print(data.head())
# (3)可视化图片及其对应的5个标题
# plt.figure(figsize=(10,20)):设置图形的大小(宽,高)
# loc——根据index索引
# iloc——根据行号来素引
npic = 5
npix = 224
target_size = (npix, npix, 3)
count = 1
# plt.figure:设置创建口尺寸
fig = plt.figure(figsize=(10, 20))
# num[a:b] numpy的切片操作
for jpgfnm in uni_filenames[10:14]:
filename = image_path + '/' + jpgfnm
captions = list(data["caption"].loc[data["filename"] == jpgfnm].values)
image_load = load_img(filename, target_size=target_size)
# 子图:就是在一张画布生成多张子图
# ax = fig.add_subplot(子图总行数,子图总列数,子图位置)
ax = fig.add_subplot(npic, 2, count, xticks=[], yticks=[])
ax.imshow(image_load)
count += 1
ax = fig.add_subplot(npic, 2, count)
plt.axis('off')# 关闭坐标轴
ax.plot()
# ax.set_xlim(设置坐标轴的范围,设置精度及精度的范围)
ax.set_xlim(0, 1)
ax.set_ylim(0, len(captions))
for i, caption in enumerate(captions):
ax.text(0, i, caption, fontsize=20)
count += 1
plt.show()
# (4)统计当前的词汇量
vocabulary = []
for txt in data.caption.values:
vocabulary.extend(txt.split())
# Vocabluary Size : 8918
# print('Vocabluary Size : %d' %len(set(vocabulary)))
# (5)文本清理:如删除标点符号、单个字符和数字值
def remove_punctuation(text_original):
text_no_punctation = text_original.translate(string.punctuation)
return(text_no_punctation)
def remove_single_character(text):
text_len_more_than1 = ""
for word in text.split():
if len(word) > 1:
text_len_more_than1 += " " + word
return(text_len_more_than1)
def remove_numeric(text):
text_no_numeric = ""
for word in text.split():
# word.isalpha():检测字符串是否只由字母组成
isalpha = word.isalpha()
if isalpha:
text_no_numeric += " " + word
return (text_no_numeric)
def text_clean(text_original):
text = remove_punctuation(text_original)
text = remove_single_character(text)
text = remove_numeric(text)
return (text)
for i, caption in enumerate(data.caption.values):
newcaption = text_clean(caption)
data["caption"].iloc[i] = newcaption
#(6)统计文本处理之后的的词汇量大小
clean_vocabluary = []
for txt in data.caption.values:
clean_vocabluary.extend(txt.split())
# Clean Vocabulary Size 8357
# print('Clean Vocabulary Size: %d' %len(set(clean_vocabluary)))
# (7)将图像路径保存在两个列表中,以便于可以使用路径集立即加载图像
# 对于标题加入两个标签:<start><end>
PATH = "D:\PyCharm2018\PyCharmProject\TestImgCap\data\Flickr8k_text\Flickr8k.token.txt"
all_captions = []
for caption in data["caption"].astype(str):
caption = '<start> ' + caption + ' <end>'
all_captions.append(caption)
print(all_captions[:10])
# 保存图像的列表
all_img_name_vector = []
for annot in data["filename"]:
full_image_path = PATH + annot
all_img_name_vector.append(full_image_path)
print(all_img_name_vector[:10])
#(8)查看列表中保存的图像和标题数量
# len(all_img_vector) : 40455
print(f"len(all_img_vector) : {len(all_img_name_vector)}")
# len(all_captions) : 40455
print(f"len(all_captions) : {len(all_captions)}")
#(9)定义一个函数:将数据及大小限制在40000个图像和标题
def data_limiter(num,total_captions,all_img_name_vector):
train_captions,img_name_vector = shuffle(total_captions,all_img_name_vector,random_state=1)
train_captions = train_captions[:num]
img_name_vector = img_name_vector[:num]
return train_captions,img_name_vector
# train_captions,img_name_vector = data_limiter(40000,total_captions,all_img_name_vector)
train_captions,img_name_vector = data_limiter(40000,all_captions,all_img_name_vector)
# if __name__ == '__main__':
#
# # Total Images in Dataset = 8091
# # print("Total Images in Dataset = {}".format(len(jpgs)))
# print(data.head())
# 3.模型定义
# 使用VGG16定义图像特征提取模型,此处不需要分类图像们只需要为图像提取矢量即可
# 因此取出掉模型的softmax层
# 首先将所有图像预处理为相同大小,即224 * 224,然后再将其输入模型
# (1)提取图像特征
def load_image(image_path):
img = tf.io.read_file(image_path)
img = tf.image.decode_jpeg(img,channels=3)
img = tf.image.resize(img,(224,224))
img = preprocess_input(img)
return img,image_path
image_model = tf.keras.applications.VGG16(include_top=False, weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output
image_features_extract_model = tf.keras.Model(new_input, hidden_layer)
image_features_extract_model.summary()
# (2)将每个图片名称映射到要加载图片的函数
encode_train= sorted(set(img_name_vector))
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(load_image,num_parallel_calls = tf.data.experimental.AUTOTUNE).batch(64)
# (3)提取图像特征并将其存储在各自的.npy文件中,然后将这些特征通过编码器传递
# .npy文件存储在任何计算机上重建数组所需的所有信息,包括dtype和shape信息
import time
for img, path in tqdm(image_dataset):
batch_features = image_features_extract_model(img)
batch_features = tf.reshape(batch_features,
(batch_features.shape[0], -1, batch_features.shape[3]))
for bf, p in zip(batch_features, path):
path_of_feature = p.numpy().decode("utf-8")
np.save(path_of_feature, bf.numpy())
# (4)标记标题,并为数据中所有唯一的单词建立词汇表
# 将词汇量限制在前5000个单词,以节省内存。将位置词汇更换为<UNk>
top_k = 5000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words = top_k,oov_token = "<UNK>",filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
tokenizer.fit_on_texts(train_captions)
train_seqs = tokenizer.texts_to_sequence(train_captions)
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs,padding = 'post')
# (5)可视化填充的训练和标题以及标记化的向量
train_captions[:3]
train_seqs[:3]
# (6)计算所有字幕的最大和最小长度
def calc_max_length(tensor):
return max(len(t) for t in tensor)
max_length = calc_max_length(train_seqs)
def calc_min_length(tensor):
return min(len(t) for t in tensor)
min_length = calc_min_length(train_seqs)
print("Max Length of any caption : Min length of any caption = " + str(max_length) + ":" + str(min_length))
# (7) 按比例拆分训练集和验证集
# train_test_split(所要划分的样本特征集,所要划分的样本结果,样本占比test_size,随机数的种子random_state)
img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector, cap_vector, test_size=0.2, random_state=0)
# (8)定义训练参数
# tf.data高效的数据流水线模块: 使用并行化策略听哈奥训练流程效率
#DataSet.prefetch()方法:进行数据预加载后的训练流程,,在GPU训练的同时CPU进行数据预加载,提高训练的效率
# 参数BufferSize:可以手动设置,也可以设置为tf.data.experimental.AUTOTUNE:自动选择合适的值
#
BATCH_SIZE = 64
BUFFER_SIZE= 1000
embedding_dim = 256
utits = 512 # 也称隐藏层,表示的是每个LSTM单元里面前馈神经网络的输出维度,每一门的计算都有一个前馈网络层;
vocab_size = len(tokenizer.word_index) + 1
num_steps = len(img_name_train)
features_shape = 512
attention_features_shape = 49
def map_func(img_name,cap):
img_tensor = np.load(img_name.decode('utf-8') + '.npy')
return img_tensor,cap
dataset = tf.data.Dataset.from_tensor_slices(img_name_train,cap_train)
# DataSert.map():也可以利用多GPU资源,并行化的对数据项进行变换,从而提高效率;
# 通过设置num_parallel——calls参数实现数据转换的并行化;
dataset = dataset.map(lambda item1,item2 : tf.numpy_function(
map_func,[item2,item2],[tf.float32,tf.int32]),
num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffle_size = tf.data.experimental.AUTOTUNE)
# (9)定义编码器—解码器架构
class VGG16_Encoder(tf.kears.Model):
# Encoder通过一个全连接层提取图像特征
def __init__(self,embedding_dim):
super(VGG16_Encoder, self).__init__()
# shape after fc == (batch_size,49,embedding_dim)
self.fc = tf.keras.layers.Dense(embedding_dim)
self.dropout = tf.keras.layers.Dropout(0.5,noise_shape = None,seed = None)
def call(self,x):
# x = self.dropout(x)
x = self.fc(x)
x = tf.nn.relu(x)
return x
# 基于GPU/CPU 功能定义RNN
# tf.keras.LSTM()接口的参数:
# 1)utils:也称隐藏层,表示的是每个LSTM单元里面前馈神经网络的输出维度;
# 2)return_sequence:布尔值:返回输出序列中的最后一个输出(False),还是全部序列(True)
# 3)return_state:布尔值,除了输出之外是否返回最后一个状态
# 4)recurrent_initializer:权重矩阵的初始化容器,用于循环层状态的线性转换
# 5)recurrent_activation:用于重复步骤的激活功能
def run_type(units):
if tf.test.is_gpu_available():
return tf.compat.v1.keras.layers.CuDNNLSTM(units,
return_sequence = True,
return_state = True,
recurrent_initializer = 'glorot+_uniform')
else:
return tf.keras.layers.GPU(utits,
return_sequences = True,
return_state = True,
recurent_activation = 'sigmoid',
recurrent_initializer = 'glorot_uniform')
# (10)使用Bahdanua注意定义RNN解码器
# 编码器的输出、隐藏层的初始化、解码器的输出
class Rnn_Local_Decoder(tf.keras.Model):
def __init__(self,embedding_dim,units,vocab_size):
super(Rnn_Local_Decoder)
self.utils = units
self.embedding = tf.keras.layers.Embedding(vocab_size,embedding_dim)
self.gru = tf.keras.layers.GRU(self.units,
return_sequence = True,
return_sattae = True,
recurrent_initializer = 'glorot_uniform')
self.fc1 = tf.kera.layers.Dense(self.units)
self.dropout = tf.keras.layers.Dropout(0.5,noise_shape = None,seed = None)
self.batchnormalization = tf.keras.layers.BatchNormalization(axis = -1,momentum = 0.99,
epsion= 0.001,center= True,
scale = True,beta_initializer = 'zeros',
gamma_initializer = 'ones',
moving_mean_initializer = 'zeros',
moving_variance_initializer = 'ones',
beta_regularizer = None,gamma_regularizer = None,
beta_constraint = None,
gamma_constaint = None)
self.fc2 = tf.keras.layers.Dense(vocab_size)
# 实现注意力机制
self.Uattn = tf.keras.layers.Dense(units)
self.Wattn = tf.keras.layers.Dense(units)
self.Vattn = tf.keras.layers.Dense(1)
def call(self,x,features,hidden):
# features shape == > (64, 49, 256) == > Output from ENCODER
# hidden shape == (batch_size, hidden_size) ==>(64,512)
# hidden_with_time_axis shape == (batch_size, 1, hidden_size) ==> (64,1,512)
hidden_with_time_axis = tf.expand_dims(hidden,1)
# score shape = (64,49,1)
# Attention Functoion
'''e(ij) == f(s(t - 1),h(j))'''
'''e(ij) = Vattn(T) * tanh(Uattn * h(j) + Wattn * s(t))'''
score = self.Vattn(tf.nn.tanh(self.Uattn(features) + self.Wattn(hidden_with_time_axis)))
attention_weights = tf.nn.softmax(score,axis = 1)
context_vector = attention_weights * features
context_vector = tf.reduce_sum(context_vector,axis = 1)
x = self.embedding(x)
x = tf.concat([tf.expand_dims(context_vector,1),x],axis = -1)
output,state = self.gru(x)
x = self.fc1(output)
x = tf.reshape(x,(-1,x.shape[2]))
x = self.dropout(x)
x = self.batchnormalization(x)
x = self.fc2(x)
return x,state,attention_weights
def reset_state(self,batch_size):
return tf.zeros((batch_size,self.units))
units = 512
encoder = VGG16_Encoder(embedding_dim)
decoder = Rnn_Local_Decoder(embedding_dim,units,vocab_size)
# (11)定义损失函数和优化器
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logiuts = True,reduction = 'none')
def loss_function(real,pred):
mask = tf.math.logical_not(tf.match.equal(real,0))
loss_ = loss_function(real,pred)
mask = tf.cast(mask,dtype = loss_.dtype)
loss_ *= mask
return tf.reduce_mean(loss_)
# 4.模型训练
# (1)定义训练步骤:将目标单词作为下一个输入传递给解码器——有助于快速了解正确的序列或序列的正确统计属性
loss_plot = []
def train_step(img_tensor,target):
loss = 0
hidden = decoder.reset_state(batch_size = target.shape[0])
dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * BATCH_SIZE,1)
with tf.GradientTape() as tape:
features = encoder(img_tensor)
for i in range(1,target.shape[1]):
predictions,hidden, _= decoder(dec_input,features,hidden)
loss += loss_function(target[:i],predictions)
dec_input = tf.expand_dims(target[:,i],1)
total_loss = (loss / int(target.shape[1]))
trainable_variables = encoder.trainable_variables + decoder.trainable_variables
gradients = tape.gradients(loss,trainable_variables)
optimizer.apply_gradients(zip(gradients,trainable_variables))
return loss,total_loss
# (2)训练模型
EPOCHS = 20
for epoch in range(start_epoch, EPOCHS):
start = time.time()
total_loss = 0
for (batch, (img_tensor, target)) in enumerate(dataset):
batch_loss, t_loss = train_step(img_tensor, target)
total_loss += t_loss
if batch % 100 == 0:
print('Epoch {} Batch {} Loss {:.4f}'.format(
epoch + 1, batch, batch_loss.numpy() / int(target.shape[1])))
# storing the epoch end loss value to plot later
loss_plot.append(total_loss / num_steps)
print('Epoch {} Loss {:.6f}'.format(epoch + 1,
total_loss / num_steps))
print('Time taken for 1 epoch {} sec\n'.format(time.time() - start))
# (3)绘制误差图
plt.plot(loss_plot)
plt.xlabel('Epochs')
plt.ylabel('Loss Plot')
plt.show()
# 5.贪婪搜索和BLEU 评估
# (1)定义字幕的贪婪方法
def evaluate(image):
attention_plot = np.zeros((max_length,attention_features_shape))
hidden = decoder.reset_state(batch_size = 1)
temp_input = tf.expand_dims(load_image(image[0],0))
img_tensor_val = image_features_extract_model(temp_input)
img_tensor_val = tf.reshape(img_tensor_val,(img_tensor_val.shape[0], -1,img_tensor_val.shape[3]))
features = encoder(img_tensor_val)
dec_input = tf.expand_dims([tokenizer.word_index['<start>']],0)
result = []
for i in range(max_length):
predictions,hidden ,attention_weights = decoder(dec_input,features,hidden)
attention_plot[i] = tf.reshape(attention_weights,(-1,)).numpy
predicted_id = tf.argmax(predictions[0]).numpy()
result.append(tokenizer.index_word[predicted_id])
if(tokenizer.index_word[predicted_id] == '<end>'):
return result,attention_plot
dec_input = tf.expand_dims([predicted_id],0)
attention_plot = attention_plot[:len(result),:]
return result,attention_plot
# (2)定义一个函数来绘制生成的每个单词的注意力图
def plot_attention(image,result,attention_plot):
tmp_image = np.array(Image.open(image))
fig = plt.figure(figsize=(10,10))
len_result = len(result)
for l in range(len_result):
tmp_att = np.resize(attention_plot[l],(8,8))
ax = fig.add_subplot(len_result // 2,len_result // 2,l + 1)
ax.set_title(result[l])
img = ax.imshow(tmp_image)
ax.imshow(tmp_att,camp = 'gray',alpha = 0.6,extent = img.get_extent())
plt.tight_layout()
plt.show()
# (3)为图片生成标题,以及每一步生成对应的注意力
rid = np.random.randint(0,len(img_name_val))
image = ''
real_caption = ' '.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]])
result,attention_plot = evaluate(image)
# 去除real_caption中的start 标记
first = real_caption.split('',1)[1]
real_caption = ""
# 去除结果集中的<unk>
for i in result:
if i == '<unk>':
result.remove(i)
for i in real_caption:
if i in real_caption:
real_caption.remove(i)
# 去除real_caption中的end标记
result_join = ''.join(result)
result_final = result_join.rsplit('',1)[0]
real_appn = []
real_appn.append(real_caption.split())
reference = real_appn
candidate = result
score = sentence_bleu(reference,candidate)
print(f"BLEU score:{score * 100}")
print('Real Caption :',real_caption)
print('Prediction Caption:',result_final)
plot_attention(image,result,attention_plot)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化