代码拉取完成,页面将自动刷新
from Topology_zoo import *
def CosineDistance(x, y):
x = np.array(x)
y = np.array(y)
# 计算向量的范数(模)
return np.linalg.norm(x - y)
def matrics(True_X, infer_X, flag = True):
TP = np.count_nonzero((True_X == 1) & (infer_X == 1)); TN = np.count_nonzero((True_X == 0) & (infer_X == 0))
FP = np.count_nonzero((True_X == 0) & (infer_X == 1)); FN = np.count_nonzero((True_X == 1) & (infer_X == 0))
# print(TP, TN, FP, FN)
if (TP + FN) != 0:
DR = TP / (TP + FN)
else:
DR = 0
if (FP + TN) != 0:
FPR = FP / (FP + TN)
else:
FPR = 1
if (TP + FP) != 0:
precision = TP / (TP + FP)
else:
precision = 0
if (precision + DR) != 0:
F1 = (2 * precision * DR) / (precision + DR)
F2 = (5 * precision * DR) / (4 * precision + DR)
else:
F1 = 0
F2 = 0
if flag:
return [round(DR, 4), round(FPR, 4), round(F1, 4), round(F2, 4), round(precision, 4)]
class Heuristics:
def __init__(self, obs_martix, route_martix, pri_cong_prob, True_X=None):
self.obs_martix = obs_martix.copy()
self.route_martix = route_martix.copy()
self.identified_martix = None
self.infer_X = None
self.True_X = None
self.a = 0.5
self.round = 20
self.Y_t = None # 不确定度矩阵
self.pri_cong_prob = pri_cong_prob.copy()
self.history = []
self.matrics = []
self.energy_history = []
self.identity_history = []
self.accuracy_history = []
self.link_history = []
def init_infer_X(self):
'''
- 初始化 X
'''
m, n = self.route_martix.shape
T = self.obs_martix.shape[1]
self.identified_martix = np.zeros((n, T)) # 初始化 identified_martix
self.Y_t = np.zeros((m, T)) # 初始化 Y_t
self.infer_X = np.ones((n, T))
# 将好路径上的好链路定义为好
self.identified_zeros()
matrics_ = matrics(self.True_X, self.infer_X)
# print("Init_infer_X's matrics", matrics_)
# infer_prob = np.sum(self.infer_X, axis=1) / self.infer_X.shape[1]
# print(infer_prob)
# print("Cong Links", np.sum(infer_prob) * 10000)
self.matrics.append(matrics_)
# print("delat", infer_prob - self.pri_cong_prob)
def energy(self):
X = self.infer_X
# 计算 每条链路的 后延拥塞概率
cong_prob = np.sum(X, axis=1) / X.shape[1]
link_n = np.sum(cong_prob) / X.shape[0]
# 计算 两个向量的余弦距离
result = CosineDistance(self.pri_cong_prob, cong_prob)
return link_n * self.a + (1 - self.a) * result, link_n, result
def identified_zeros(self):
# 将观测结果为 0 的路径上的链路定义为 好
for t in range(self.obs_martix.shape[1]): # T
path_index_set = np.where(self.obs_martix[:, t] == 0)[0] # 找出 t 时刻 观测结果为 0 的路径集合
for path in path_index_set:
link_index_set = np.where(self.route_martix[path, :] == 1)[0]
for link in link_index_set:
self.infer_X[link, t] = 0
self.identified_martix[link, t] = 1
def calc_uncertainty(self):
# 计算 确定性矩阵中 0的个数占比
return ( 1 - np.sum(self.identified_martix) / self.identified_martix.size )
def update_Y_t(self, t_=None):
if t_ == None:
for t in range(self.obs_martix.shape[1]): # T
path_index_set = np.where(self.obs_martix[:, t] == 1)[0] # 找出 t时刻 观测结果为 1 的路径集合
# 对 path_index_set中的链路按照 不确定度进行排序
for path in path_index_set: # 遍历所有观测结果为 1 的路径
not_identified = 0
link_index_set = np.where(self.route_martix[path, :] == 1)[0] # 找出该路径上的所有链路
for link in link_index_set: # 遍历链路
if self.identified_martix[link, t] == 0:
not_identified += 1
self.Y_t[path, t] = not_identified
else:
t = t_
path_index_set = np.where(self.obs_martix[:, t] == 1)[0] # 找出 t时刻 观测结果为 1 的路径集合
# 对 path_index_set中的链路按照 不确定度进行排序
for path in path_index_set: # 遍历所有观测结果为 1 的路径
not_identified = 0
link_index_set = np.where(self.route_martix[path, :] == 1)[0] # 找出该路径上的所有链路
for link in link_index_set: # 遍历链路
if self.identified_martix[link, t] == 0:
not_identified += 1
self.Y_t[path, t] = not_identified
def find_one_uncertainty(self, t):
# 找出 t 时刻 不确定度为 1 的路径
self.update_Y_t(t)
path_index_set = np.where(self.Y_t[:, t] == 1)[0]
if len(path_index_set) == 0:
return None
else:
return path_index_set.tolist()
def update_uncertainty(self, t_=None):
# 计算观测结果为 1 的路径上的 不确定度
if t_ == None:
for t in range(self.obs_martix.shape[1]): # T
if(self.find_one_uncertainty(t) != None):
set_ = self.find_one_uncertainty(t) # 寻找 Y_t == 1 的路径集合
for path in set_:
link_index_set = np.where(self.route_martix[path, :] == 1)[0]
for link in link_index_set:
self.identified_martix[link, t] = 1
self.Y_t[path, t] = 0
else:
t = t_
if(self.find_one_uncertainty(t) != None):
set_ = self.find_one_uncertainty(t) # 寻找 Y_t == 1 的路径集合
for path in set_:
link_index_set = np.where(self.route_martix[path, :] == 1)[0]
for link in link_index_set:
self.identified_martix[link, t] = 1
self.Y_t[path, t] = 0
def statistic_uncertainty(self):
# 对 Y_t 按列求和
sum_ = np.sum(self.Y_t, axis=0)
# print(sum_)
zero_indexs = [x for x in range(len(sum_)) if sum_[x] == 0]
# print(zero_indexs)
# 对求和的结果进行排序
sorted_index = np.argsort(sum_)
# print(sorted_index)
# 去除 0
sorted_index = np.array([sorted_index[x] for x in range(len(sorted_index)) if sorted_index[x] not in zero_indexs])
return sorted_index
def calc_delat_prob(self):
# link_carry = np.sum(self.route_martix, axis=0)
infer_prob = np.sum(self.infer_X, axis=1) / self.infer_X.shape[1]
delat = (infer_prob - self.pri_cong_prob) #/ link_carry
for idx in range(len(delat)):
if delat[idx] <= 0:
delat[idx] = 0
zero_idx = [x for x in range(len(delat)) if delat[x] == 0]
sorted_index = np.argsort(delat)[::-1]
sorted_index = np.array([sorted_index[x] for x in range(len(sorted_index)) if sorted_index[x] not in zero_idx])
return sorted_index
def check_stop(self):
self.update_uncertainty()
if np.sum(self.Y_t) == 0:
return True
else:
sum_ = np.sum(self.Y_t, axis=0)
who_not_certain = [x for x in range(len(sum_)) if sum_[x] != 0]
# print("uncertain", who_not_certain)
return False
def check_fit(self):
if (np.dot(self.route_martix, self.infer_X) >= self.obs_martix).all():
return True
else:
return False
def simulation(self):
# self.identified_zeros()
self.update_Y_t()
sorted_index = self.statistic_uncertainty()
#if self.calc_uncertainty() > 0.05:
sorted_index = sorted_index[int(len(sorted_index) * 0.67):]
# print("simulate-sorted_index", sorted_index)
changed_ = 0
changed_true = 0
for t in sorted_index:
for path in range(self.obs_martix.shape[0]):
self.update_Y_t(t)
if self.Y_t[path, t] >= 1:
link_index_set = np.where(self.route_martix[path, :] == 1)[0]
link_index_set = np.array([x for x in link_index_set if self.identified_martix[x, t] == 0])
sorted_delat_cong_prob_idx = self.calc_delat_prob()
if np.all([link not in sorted_delat_cong_prob_idx for link in link_index_set]):# 如果模糊链路 都不在要变化的范围内, 则置位确定链路
break
for link in sorted_delat_cong_prob_idx:
if link in link_index_set:
# if self.True_X[link, t] == 0:
# changed_true += 1
self.infer_X[link, t] = 0 # 1 --> 0
changed_ += 1
self.identified_martix[link, t] = 1
if(self.check_fit() != True):
self.infer_X[link, t] = 1
self.identified_martix[link, t] = 0
changed_ -= 1
# if self.True_X[link, t] == 0:
# changed_true -= 1
continue
break
self.accuracy_history.append((changed_true, changed_))
def run(self, init_=True):
print("\r Heu inferring .....")
self.init_infer_X()
self.identified_zeros()
self.update_uncertainty()
for i in range(self.round):
# while(self.check_stop() != True):
# print(f"\r round{i}, {self.matrics[-1]}", end="")
# self.links_matrics()
# self.identity_history.append(self.calc_uncertainty())
self.simulation()
self.matrics.append(matrics(self.True_X, self.infer_X))
self.energy_history.append(self.energy())
if self.accuracy_history[-1][1] < 5:
break
return self.infer_X
def links_matrics(self):
links = []
for cnt in range(self.route_martix.shape[1]):
res = matrics(self.True_X[cnt], self.infer_X[cnt], True)
links.append(res)
self.link_history.append(links)
def Evaluate(self):
print("---------Heu-------------")
print("Heu-matrics", matrics(self.True_X, self.infer_X))
prob = np.sum(self.infer_X, axis=1) / self.infer_X.shape[1]
print("Cong prob", prob)
print("Cong Links", np.sum(prob) * 10000)
# return np.sum(self.infer_X, axis=1) / self.infer_X.shape[1]
def alg_heu(obs_martix, route_martix, pri_cong_prob):
heu = Heuristics(obs_martix, route_martix, pri_cong_prob)
infer_X = heu.run()
return infer_X
def alg_heu_init(obs_martix, route_martix, pri_cong_prob):
heu = Heuristics(obs_martix, route_martix, pri_cong_prob)
init_infer = heu.init_infer_X()
return init_infer
if __name__ == '__main__':
net = 网络基类()
net.配置拓扑("./topology_zoo/topology_zoo数据集/Agis.gml")
np.random.seed(10)
net.部署测量路径(源节点列表=[16, 0 ,12])
net.配置参数(异构链路先验拥塞概率 = 0.10)
net.运行网络(运行的总时间 = 1000)
pri_cong_prob = np.array([link.先验拥塞概率 for link in net.链路集和])
route_martix = net.路由矩阵
obs_martix = net.运行日志['路径状态']
True_X = net.运行日志['链路状态']
print('mean_prob', np.mean(pri_cong_prob))
route_martix = np.where(route_martix, 1, 0)
obs_martix = np.where(obs_martix, 0, 1)
True_X = np.where(True_X, 0, 1)
lk = alg_heu(obs_martix, route_martix, pri_cong_prob)
print(lk)
print(matrics(True_X, lk))
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。