代码拉取完成,页面将自动刷新
import numpy as np
from z3 import *
from copy import copy
def matrics(True_X, infer_X):
#inferred_array由算法得出,real_array即真实信息,rounds表示测试了几轮
ave_FPR=0
ave_DR=0
ave_F1=0
ave_F2=0; ave_Prec=0
rounds = infer_X.shape[1]; real_array = True_X; inferred_array = infer_X
for round in range(rounds):
TP=0
FP=0
FN=0
TN=0
for link_index in range(len(real_array.T[round])):
if real_array.T[round][link_index]==0 and inferred_array.T[round][link_index]==0:
TN+=1
elif real_array.T[round][link_index]==0 and inferred_array.T[round][link_index]==1:
FP+=1
elif real_array.T[round][link_index]==1 and inferred_array.T[round][link_index]==1:
TP+=1
elif real_array.T[round][link_index]==1 and inferred_array.T[round][link_index]==0:
FN+=1
if TP+FN == 0:
DR=1
else:
DR=TP/(TP+FN)
if FP + TN == 0:
FPR=0
else:
FPR=FP/(FP+TN)
if TP+FP==0:
precision=1
else:
precision=TP/(TP+FP)
if TP+FN==0:
recall=1
else:
recall=TP/(TP+FN)
if TP == 0:
if sum(real_array.T[round])==0:
F1=1;F2=1
else:
F1=0;F2=0
else:
F1 = 2*precision*recall/(precision+recall)
F2 = 5*precision*recall/(4*precision+recall)
ave_FPR += FPR
ave_DR += DR
ave_F1 += F1
ave_F2 += F2; ave_Prec += precision
ave_FPR /=rounds
ave_DR /=rounds
ave_F1 /=rounds; ave_F2 /=rounds; ave_Prec /=rounds
return [ave_DR, ave_FPR, ave_F1, ave_F2, ave_Prec]
def alg_tomo(y: np.ndarray, A_rm: np.ndarray):
"""
tomo算法的调用接口
:param y:路径的观测状态,列向量;如为矩阵,横维度为时间,纵维度为链路状态
:param A_rm:为路由矩阵,纵纬度为路径,横维度为链路
:return: x_identified 列向量,如为矩阵,横维度为时间,纵维度为链路推测状态
"""
print("\r Tomo inferring...", end="")
if np.ndim(y) <= 1: # 强制转换为列向量
y = y.reshape((-1, 1))
_, n = A_rm.shape
_, num_time = y.shape
x_identified = np.zeros((n, num_time))
for i in range(num_time):
path_state_obv = y[:, i] # 按照行来取
links_state_inferred = TOMO_z3(path_state_obv, A_rm)
for j in range(links_state_inferred.shape[0]):
x_identified[j][i] = links_state_inferred[j]
return x_identified
def TOMO_z3(y, A_rm):
y_new = copy(y)
_, n = A_rm.shape
links_state_inferred = np.zeros(n, dtype=np.int8)
if np.where(y_new > 0)[0].size <= 0:
return np.zeros(n, dtype=np.int8)
# 根据 路径的测量结果对 链路进行统计
path_good_index, = np.where(y_new < 1)
link_good_index, = np.where(np.sum(A_rm[path_good_index, :], axis = 0) > 0)
path_fault_index, = np.where(y_new > 0)
# 将所有的路径分为两类,一类是经过故障链路的,一类是没有经过故障链路的
link_fault_sets = []
for index in path_fault_index:
link_fault_set = np.where(A_rm[index, :] > 0)
link_fault_set = link_fault_set[0].tolist()
link_fault_sets.append([x for x in link_fault_set if x not in link_good_index])
# 对故障链路组成的集和 去重
U_ = [x for set_ in link_fault_sets for x in set_]
U = list(set(U_))
# 若两者皆为空集 ,则返回
if len(U) <= 0 or len(link_fault_sets) <= 0:
return np.full(n, np.nan)
solver = Optimize()
# 对所有故障链路集和 中的元素,创建布尔变量
elements = {u: Bool(u) for u in U}
# 加入限制,使得 最小一致故障集 中的每个子集至少包含 故障链路集和 中的一个元素
for s in link_fault_sets:
subset_contains_element = Or([elements[u] for u in s])
solver.add(subset_contains_element)
# 最小化被选中的元素的数量
num_selected_elements = sum([If(elements[u], 1, 0) for u in U])
solver.minimize(num_selected_elements)
if solver.check() == sat:
model = solver.model()
selected_elements = [u for u in U if is_true(model[elements[u]])]
links_state_inferred[selected_elements] = 1
return np.array(links_state_inferred)
else:
return np.full(n, np.nan)
def get_paths(link: int, route_matrix: np.ndarray):
"""
获取进过链路link的所有路径
:param link:
:return: paths:list
"""
assert link >= 0
paths, = np.where(route_matrix[:, link] > 0)
return paths.tolist()
def test_tomo():
y = np.array([[0, 0, 0],
[0, 0, 1],
[1, 0, 0],
[0, 1, 1],
[1, 0, 0],
[0, 0, 1],
[1, 1, 0],
[1, 1, 1]], dtype=np.int8).T
A_rm = np.array([
[1, 0, 1, 1, 0],
[1, 0, 1, 0, 1],
[0, 1, 1, 0, 1]], dtype=np.int8)
links_state_inferred = alg_tomo(y, A_rm)
print(links_state_inferred.T)
if __name__ == '__main__':
test_tomo()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。