代码拉取完成,页面将自动刷新
import numpy as np
import torch
import random
import torch.utils.data
from torch.utils.data.sampler import SubsetRandomSampler
import random
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import torch.optim as optim
import time
from sklearn.metrics import confusion_matrix
import itertools
import sklearn
from sklearn.metrics import accuracy_score
from sklearn.metrics import classification_report
import warnings
from torch.utils.data.sampler import WeightedRandomSampler
def get_key(val,my_dict):
"""
Simple Function to Get Key
in Dictionary from val.
Input: Key, Dictionary
Output: Val
"""
for key, value in my_dict.items():
if val == value:
return key
return "key doesn't exist"
def one_hot(c,classes):
"""
Simple one hot encoding for the
types of arrthymia conditions.
class --> encode class
'N' --> [1, 0, 0, 0, 0, 0, 0, 0]
c:: current class of the object
classes:: classes dictionary
"""
enc=np.zeros(len(classes),dtype=int).tolist()
enc[get_key(c,classes)]= 1
return enc
def get_train_test(X,y,train_size,classes=classes,patients=all_patients):
"""
Get train and test function for spliting ensuring testing has all classes
preseting for testing/eval to see how well all classes are performing.
"""
sub_c={}
for c in classes:
C = np.argwhere(y[:,2] == list(classes.values())[c]).flatten()
sub_c[c]=np.random.choice(C,int((C.shape[0]- C.shape[0]*train_size)))
X_test = np.vstack([X[sub_c[0]], X[sub_c[1]], X[sub_c[2]], X[sub_c[3]], X[sub_c[4]]])
y_test = np.vstack([y[sub_c[0]], y[sub_c[1]], y[sub_c[2]], y[sub_c[3]], y[sub_c[4]]])
deletions=[]
for i in range(len(sub_c)):
deletions.extend(sub_c[i].tolist())
X_train = np.delete(X, deletions, axis=0)
y_train = np.delete(y, deletions, axis=0)
X_train, y_train = shuffle(X_train, y_train, random_state=0)
X_test, y_test = shuffle(X_test, y_test, random_state=0)
y_train= np.array([get_key(y_i,classes) for y_i in y_train[:,2]])
y_test= np.array([get_key(y_i,classes) for y_i in y_test[:,2]])
return X_train,y_train,X_test,y_test
def imbalanced_loader(X_train,y_train,X_test,y_test,valid_size=.05,batch_size=512): # Split train into train + validation
"""
Get trainloader, validloader, and testloader for model training. This
creates equal training batches but naturally balanced validation and testing
sets. Note the testing set was previously augmented to get better per class metrics
Outputs: dataloader + testloader, where dataloader = {"train": trainloader, "val": validloader}
"""
warnings.filterwarnings("ignore") #torch bug
print ('Getting Data... {}% Validation Set\n'.format(int(np.around(valid_size*100))))
num_train = len(X_train)
indices = list(range(num_train))
split = int(np.floor(valid_size * num_train))
train_idx, valid_idx = indices[split:], indices[:split]
print("Batch Size:",batch_size)
print('\nTrain Len=',len(train_idx),', Validation Len=',len(valid_idx), 'Test Len=',len(y_test))
class_sample_count = np.array([len(np.where(y_train[[train_idx]]==t)[0]) for t in np.unique(y_train[[train_idx]])])
weight = 1. / class_sample_count
samples_weight = np.array([weight[t] for t in y_train[[train_idx]]])
samples_weight = torch.from_numpy(samples_weight)
train_sampler = WeightedRandomSampler(torch.tensor(samples_weight,dtype=torch.double), len(samples_weight))
trainDataset = torch.utils.data.TensorDataset(torch.FloatTensor(X_train[[train_idx]]), torch.LongTensor(y_train[[train_idx]].astype(int)))
train_sampler= torch.utils.data.BatchSampler(sampler=train_sampler, batch_size=batch_size, drop_last=True)
trainloader = torch.utils.data.DataLoader(dataset = trainDataset, batch_size=batch_size, num_workers=1, sampler= train_sampler)
valDataset = torch.utils.data.TensorDataset(torch.FloatTensor(X_train[[valid_idx]]), torch.LongTensor(y_train[[valid_idx]].astype(int)))
sampler = torch.utils.data.RandomSampler(valDataset)
sampler= torch.utils.data.BatchSampler(sampler, batch_size, drop_last=True)
validloader = torch.utils.data.DataLoader(dataset = valDataset, batch_size=batch_size, num_workers=1,sampler=sampler)
testset=[]
for i,x in enumerate(X_test):
testset.append((torch.from_numpy(x),torch.tensor([y_test[i]])))
#testloader = torch.utils.data.DataLoader(dataset = testDataset, batch_size=batch_size, shuffle=False, num_workers=1)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
shuffle=False, num_workers=1)
print("")
dataloader = {"train": trainloader, "val": validloader}
print('Train Size Batched=',int(len(dataloader['train'].dataset)/batch_size),', Validation Size Batched=',int(len(dataloader['val'].dataset)/batch_size),', Test Size Batched=',len(testloader))
warnings.resetwarnings()
return dataloader,testloader
class Anomaly_Classifier(nn.Module):
def __init__(self, input_size,num_classes):
super(Anomaly_Classifier, self).__init__()
self.conv= nn.Conv1d(in_channels=input_size, out_channels=32, kernel_size=5,stride=1)
self.conv_pad = nn.Conv1d(in_channels=32, out_channels=32, kernel_size=5,stride=1,padding=2)
self.drop_50 = nn.Dropout(p=0.5)
self.maxpool = nn.MaxPool1d(kernel_size=5,stride=2)
self.dense1 = nn.Linear(32 * 8, 32)
self.dense2 = nn.Linear(32, 32)
self.dense_final = nn.Linear(32, num_classes)
self.softmax= nn.LogSoftmax(dim=1)
def forward(self, x):
residual= self.conv(x)
#block1
x = F.relu(self.conv_pad(residual))
x = self.conv_pad(x)
x+= residual
x = F.relu(x)
residual = self.maxpool(x) #[512 32 90]
#block2
x=F.relu(self.conv_pad(residual))
x=self.conv_pad(x)
x+=residual
x= F.relu(x)
residual = self.maxpool(x) #[512 32 43]
#block3
x=F.relu(self.conv_pad(residual))
x=self.conv_pad(x)
x+=residual
x= F.relu(x)
residual = self.maxpool(x) #[512 32 20]
#block4
x=F.relu(self.conv_pad(residual))
x=self.conv_pad(x)
x+=residual
x= F.relu(x)
x= self.maxpool(x) #[512 32 8]
#MLP
x = x.view(-1, 32 * 8) #Reshape (current_dim, 32*2)
x = F.relu(self.dense1(x))
#x = self.drop_60(x)
x= self.dense2(x)
x = self.softmax(self.dense_final(x))
return x
def reset_weights(model):
"""
model.apply(reset_weights) will reset all the model parameters.
This way the model is not overwhelmed
"""
if isinstance(model, nn.Conv1d) or isinstance(model, nn.Linear):
model.reset_parameters()
def calc_accuracy(output,Y):
# get acc_scores during training
max_vals, max_indices = torch.max(output,1)
train_acc = (max_indices == Y).sum().data.cpu().numpy()/max_indices.size()[0]
return train_acc
def train_model(data_loader, model, criterion,optimizer, n_epochs=100,print_every=10,verbose=True,plot_results=True,validation=True):
"""
Model Training Function.
Input:
Dataloader: {'train':trainloader,'val':validloader} --> If no validation is used set Validation = False & dataloader= {'train':trainloader}
model: model.cuda() if gpu will be used, else cpu
print_every: print every n epochs
verbose: print out results per epoch
plot_results: plot the train and valid loss
validation: is validation set in dataloader
Output:
trained classifier
"""
losses=[]
start= time.time()
print('Training for {} epochs...\n'.format(n_epochs))
for epoch in range(n_epochs):
if verbose == True and epoch % print_every== 0:
print('\n\nEpoch {}/{}:'.format(epoch+1, n_epochs))
if validation == True:
evaluation=['train', 'val']
else:
evaluation=['train']
# Each epoch has a training and validation phase
for phase in evaluation:
if phase == 'train':
model.train(True) # Set model to training mode
else:
model.train(False) # Set model to evaluate mode
running_loss = 0.0
# Iterate over data.
for hb,labels in data_loader[phase]:
for hb_index,label in enumerate(labels):
HB, label = hb[hb_index].unsqueeze(1).cuda(), label.cuda()
# forward + backward + optimize
outputs = model(HB)
acc= calc_accuracy(outputs,label)
loss = criterion(outputs, label)#loss function
# zero the parameter (weight) gradients
optimizer.zero_grad()
# backward + optimize only if in training phase
if phase == 'train':
loss.backward()
# update the weights
optimizer.step()
# print loss statistics
running_loss += loss.item()
losses.append(running_loss)
if verbose == True and epoch % print_every== 0:
print('{} loss: {:.4f} | acc: {:.4f}|'.format(phase, running_loss,acc), end=' ')
if verbose == True:
print('\nFinished Training | Time:{}'.format(time.time()-start))
if plot_results == True:
plt.figure(figsize=(10,10))
plt.plot(losses[0::2],label='train_loss')
if validation == True:
plt.plot(losses[1::2],label='validation_loss')
plt.legend()
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.draw()
return model
def evaluate(testloader, trained_model,verbose= True):
"""
Evaluation Metric Platfrom. Feed in the trained model
and test loader data.
Returns classification metric along with
predictions,truths
"""
truth=[]
preds=[]
for hb,label in testloader:
HB, label = hb.float().unsqueeze(1).cuda(), label.cuda()
outputs = trained_model(HB)
_, predicted = torch.max(outputs, 1)
preds.append(predicted.cpu().numpy().tolist())
truth.append(label.cpu().numpy().tolist())
preds_flat = [item for sublist in preds for item in sublist]
truth_flat = [item for sublist in truth for item in sublist]
if verbose == True:
print('\nEvaluating....')
print("TEST ACC:",accuracy_score(truth_flat,preds_flat))
print(classification_report(truth_flat,preds_flat))
return preds_flat,truth_flat
def plot_confusion_matrix(cm, classes,
normalize=False,
title='Confusion matrix',
cmap=plt.cm.Blues):
"""
This function prints and plots the confusion matrix.
Normalization can be applied by setting `normalize=True`.
"""
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
print("Normalized confusion matrix")
else:
print('Confusion matrix, without normalization')
plt.figure(figsize=(10,10))
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
tick_marks = np.arange(len(classes))
plt.xticks(tick_marks, classes, rotation=45)
plt.yticks(tick_marks, classes)
fmt = '.2f' if normalize else 'd'
thresh = cm.max() / 2.
for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
plt.text(j, i, format(cm[i, j], fmt),
horizontalalignment="center",
color="white" if cm[i, j] > thresh else "black")
plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
def variation(n_epochs,num_iters=5):
"""
Examing model for n iterations
"""
p=[]
t=[]
accuracy_scores=[]
for i in range(num_iters):
print('\nModel {}/{}...\n'.format(i+1,num_iters))
Anomaly_Classifier(input_size=1,num_classes= 5).cuda().apply(reset_weights)
print('Weights Reset')
anom_classifier= Anomaly_Classifier(input_size=1,num_classes= 8).cuda()
criterion = nn.NLLLoss()
optimizer = optim.Adam(anom_classifier.parameters(),lr = 0.001)
trained_classifier= train_model(data_loader=dataloader, model=anom_classifier,
criterion = criterion,optimizer = optimizer ,
n_epochs=n_epochs,print_every=1,verbose=False,plot_results=False,
validation=True)
preds,truth = evaluate(testloader=testloader, trained_model = trained_classifier,verbose=False)
t.append(truth)
p.append(preds)
print(accuracy_score(truth,preds))
accuracy_scores.append(accuracy_score(truth,preds))
return p,t,accuracy_scores
def get_kernel_size(n_h,k_h,n_w,k_w,p_h=0,s_h=1,p_w=0,s_w=1):
"""
Kernel Measuring Function
"""
return [int((n_h-k_h+p_h+s_h)/s_h),int((n_w-k_w+p_w+s_w)/s_w)]
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。