加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
torch_tools.py 12.99 KB
一键复制 编辑 原始数据 按行查看 历史
Eddy Mina 提交于 2019-04-17 08:53 . cleaned layout
import numpy as np
import torch
import random
import torch.utils.data
from torch.utils.data.sampler import SubsetRandomSampler
import random
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import torch.optim as optim
import time
from sklearn.metrics import confusion_matrix
import itertools
import sklearn
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
import warnings
from torch.utils.data.sampler import WeightedRandomSampler
def get_key(val,my_dict):
"""
Simple Function to Get Key
in Dictionary from val.
Input: Key, Dictionary
Output: Val
"""
for key, value in my_dict.items():
if val == value:
return key
return "key doesn't exist"
def one_hot(c,classes):
"""
Simple one hot encoding for the
types of arrthymia conditions.
class --> encode class
'N' --> [1, 0, 0, 0, 0, 0, 0, 0]
c:: current class of the object
classes:: classes dictionary
"""
enc=np.zeros(len(classes),dtype=int).tolist()
enc[get_key(c,classes)]= 1
return enc
def get_train_test(X,y,train_size,classes=classes,patients=all_patients):
"""
Get train and test function for spliting ensuring testing has all classes
preseting for testing/eval to see how well all classes are performing.
"""
sub_c={}
for c in classes:
C = np.argwhere(y[:,2] == list(classes.values())[c]).flatten()
sub_c[c]=np.random.choice(C,int((C.shape[0]- C.shape[0]*train_size)))
X_test = np.vstack([X[sub_c[0]], X[sub_c[1]], X[sub_c[2]], X[sub_c[3]], X[sub_c[4]]])
y_test = np.vstack([y[sub_c[0]], y[sub_c[1]], y[sub_c[2]], y[sub_c[3]], y[sub_c[4]]])
deletions=[]
for i in range(len(sub_c)):
deletions.extend(sub_c[i].tolist())
X_train = np.delete(X, deletions, axis=0)
y_train = np.delete(y, deletions, axis=0)
X_train, y_train = shuffle(X_train, y_train, random_state=0)
X_test, y_test = shuffle(X_test, y_test, random_state=0)
y_train= np.array([get_key(y_i,classes) for y_i in y_train[:,2]])
y_test= np.array([get_key(y_i,classes) for y_i in y_test[:,2]])
return X_train,y_train,X_test,y_test
def imbalanced_loader(X_train,y_train,X_test,y_test,valid_size=.05,batch_size=512): # Split train into train + validation
"""
Get trainloader, validloader, and testloader for model training. This
creates equal training batches but naturally balanced validation and testing
sets. Note the testing set was previously augmented to get better per class metrics
Outputs: dataloader + testloader, where dataloader = {"train": trainloader, "val": validloader}
"""
warnings.filterwarnings("ignore") #torch bug
print ('Getting Data... {}% Validation Set\n'.format(int(np.around(valid_size*100))))
num_train = len(X_train)
indices = list(range(num_train))
split = int(np.floor(valid_size * num_train))
train_idx, valid_idx = indices[split:], indices[:split]
print("Batch Size:",batch_size)
print('\nTrain Len=',len(train_idx),', Validation Len=',len(valid_idx), 'Test Len=',len(y_test))
class_sample_count = np.array([len(np.where(y_train[[train_idx]]==t)[0]) for t in np.unique(y_train[[train_idx]])])
weight = 1. / class_sample_count
samples_weight = np.array([weight[t] for t in y_train[[train_idx]]])
samples_weight = torch.from_numpy(samples_weight)
train_sampler = WeightedRandomSampler(torch.tensor(samples_weight,dtype=torch.double), len(samples_weight))
trainDataset = torch.utils.data.TensorDataset(torch.FloatTensor(X_train[[train_idx]]), torch.LongTensor(y_train[[train_idx]].astype(int)))
train_sampler= torch.utils.data.BatchSampler(sampler=train_sampler, batch_size=batch_size, drop_last=True)
trainloader = torch.utils.data.DataLoader(dataset = trainDataset, batch_size=batch_size, num_workers=1, sampler= train_sampler)
valDataset = torch.utils.data.TensorDataset(torch.FloatTensor(X_train[[valid_idx]]), torch.LongTensor(y_train[[valid_idx]].astype(int)))
sampler = torch.utils.data.RandomSampler(valDataset)
sampler= torch.utils.data.BatchSampler(sampler, batch_size, drop_last=True)
validloader = torch.utils.data.DataLoader(dataset = valDataset, batch_size=batch_size, num_workers=1,sampler=sampler)
testset=[]
for i,x in enumerate(X_test):
testset.append((torch.from_numpy(x),torch.tensor([y_test[i]])))
#testloader = torch.utils.data.DataLoader(dataset = testDataset, batch_size=batch_size, shuffle=False, num_workers=1)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
shuffle=False, num_workers=1)
print("")
dataloader = {"train": trainloader, "val": validloader}
print('Train Size Batched=',int(len(dataloader['train'].dataset)/batch_size),', Validation Size Batched=',int(len(dataloader['val'].dataset)/batch_size),', Test Size Batched=',len(testloader))
warnings.resetwarnings()
return dataloader,testloader
class Anomaly_Classifier(nn.Module):
def __init__(self, input_size,num_classes):
super(Anomaly_Classifier, self).__init__()
self.conv= nn.Conv1d(in_channels=input_size, out_channels=32, kernel_size=5,stride=1)
self.conv_pad = nn.Conv1d(in_channels=32, out_channels=32, kernel_size=5,stride=1,padding=2)
self.drop_50 = nn.Dropout(p=0.5)
self.maxpool = nn.MaxPool1d(kernel_size=5,stride=2)
self.dense1 = nn.Linear(32 * 8, 32)
self.dense2 = nn.Linear(32, 32)
self.dense_final = nn.Linear(32, num_classes)
self.softmax= nn.LogSoftmax(dim=1)
def forward(self, x):
residual= self.conv(x)
#block1
x = F.relu(self.conv_pad(residual))
x = self.conv_pad(x)
x+= residual
x = F.relu(x)
residual = self.maxpool(x) #[512 32 90]
#block2
x=F.relu(self.conv_pad(residual))
x=self.conv_pad(x)
x+=residual
x= F.relu(x)
residual = self.maxpool(x) #[512 32 43]
#block3
x=F.relu(self.conv_pad(residual))
x=self.conv_pad(x)
x+=residual
x= F.relu(x)
residual = self.maxpool(x) #[512 32 20]
#block4
x=F.relu(self.conv_pad(residual))
x=self.conv_pad(x)
x+=residual
x= F.relu(x)
x= self.maxpool(x) #[512 32 8]
#MLP
x = x.view(-1, 32 * 8) #Reshape (current_dim, 32*2)
x = F.relu(self.dense1(x))
#x = self.drop_60(x)
x= self.dense2(x)
x = self.softmax(self.dense_final(x))
return x
def reset_weights(model):
"""
model.apply(reset_weights) will reset all the model parameters.
This way the model is not overwhelmed
"""
if isinstance(model, nn.Conv1d) or isinstance(model, nn.Linear):
model.reset_parameters()
def calc_accuracy(output,Y):
# get acc_scores during training
max_vals, max_indices = torch.max(output,1)
train_acc = (max_indices == Y).sum().data.cpu().numpy()/max_indices.size()[0]
return train_acc
def train_model(data_loader, model, criterion,optimizer, n_epochs=100,print_every=10,verbose=True,plot_results=True,validation=True):
"""
Model Training Function.
Input:
Dataloader: {'train':trainloader,'val':validloader} --> If no validation is used set Validation = False & dataloader= {'train':trainloader}
model: model.cuda() if gpu will be used, else cpu
print_every: print every n epochs
verbose: print out results per epoch
plot_results: plot the train and valid loss
validation: is validation set in dataloader
Output:
trained classifier
"""
losses=[]
start= time.time()
print('Training for {} epochs...\n'.format(n_epochs))
for epoch in range(n_epochs):
if verbose == True and epoch % print_every== 0:
print('\n\nEpoch {}/{}:'.format(epoch+1, n_epochs))
if validation == True:
evaluation=['train', 'val']
else:
evaluation=['train']
# Each epoch has a training and validation phase
for phase in evaluation:
if phase == 'train':
model.train(True) # Set model to training mode
else:
model.train(False) # Set model to evaluate mode
running_loss = 0.0
# Iterate over data.
for hb,labels in data_loader[phase]:
for hb_index,label in enumerate(labels):
HB, label = hb[hb_index].unsqueeze(1).cuda(), label.cuda()
# forward + backward + optimize
outputs = model(HB)
acc= calc_accuracy(outputs,label)
loss = criterion(outputs, label)#loss function
# zero the parameter (weight) gradients
optimizer.zero_grad()
# backward + optimize only if in training phase
if phase == 'train':
loss.backward()
# update the weights
optimizer.step()
# print loss statistics
running_loss += loss.item()
losses.append(running_loss)
if verbose == True and epoch % print_every== 0:
print('{} loss: {:.4f} | acc: {:.4f}|'.format(phase, running_loss,acc), end=' ')
if verbose == True:
print('\nFinished Training | Time:{}'.format(time.time()-start))
if plot_results == True:
plt.figure(figsize=(10,10))
plt.plot(losses[0::2],label='train_loss')
if validation == True:
plt.plot(losses[1::2],label='validation_loss')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.draw()
return model
def evaluate(testloader, trained_model,verbose= True):
"""
Evaluation Metric Platfrom. Feed in the trained model
and test loader data.
Returns classification metric along with
predictions,truths
"""
truth=[]
preds=[]
for hb,label in testloader:
HB, label = hb.float().unsqueeze(1).cuda(), label.cuda()
outputs = trained_model(HB)
_, predicted = torch.max(outputs, 1)
preds.append(predicted.cpu().numpy().tolist())
truth.append(label.cpu().numpy().tolist())
preds_flat = [item for sublist in preds for item in sublist]
truth_flat = [item for sublist in truth for item in sublist]
if verbose == True:
print('\nEvaluating....')
print("TEST ACC:",accuracy_score(truth_flat,preds_flat))
print(classification_report(truth_flat,preds_flat))
return preds_flat,truth_flat
def plot_confusion_matrix(cm, classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues):
"""
This function prints and plots the confusion matrix.
Normalization can be applied by setting `normalize=True`.
"""
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
print("Normalized confusion matrix")
else:
print('Confusion matrix, without normalization')
plt.figure(figsize=(10,10))
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
def variation(n_epochs,num_iters=5):
"""
Examing model for n iterations
"""
p=[]
t=[]
accuracy_scores=[]
for i in range(num_iters):
print('\nModel {}/{}...\n'.format(i+1,num_iters))
Anomaly_Classifier(input_size=1,num_classes= 5).cuda().apply(reset_weights)
print('Weights Reset')
anom_classifier= Anomaly_Classifier(input_size=1,num_classes= 8).cuda()
criterion = nn.NLLLoss()
optimizer = optim.Adam(anom_classifier.parameters(),lr = 0.001)
trained_classifier= train_model(data_loader=dataloader, model=anom_classifier,
criterion = criterion,optimizer = optimizer ,
n_epochs=n_epochs,print_every=1,verbose=False,plot_results=False,
validation=True)
preds,truth = evaluate(testloader=testloader, trained_model = trained_classifier,verbose=False)
t.append(truth)
p.append(preds)
print(accuracy_score(truth,preds))
accuracy_scores.append(accuracy_score(truth,preds))
return p,t,accuracy_scores
def get_kernel_size(n_h,k_h,n_w,k_w,p_h=0,s_h=1,p_w=0,s_w=1):
"""
Kernel Measuring Function
"""
return [int((n_h-k_h+p_h+s_h)/s_h),int((n_w-k_w+p_w+s_w)/s_w)]
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化