加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
client_api.py 24.57 KB
一键复制 编辑 原始数据 按行查看 历史
jinyuzhu 提交于 2022-04-22 15:19 . fix: display None edgetype
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
import json
import os
import sys
import pickle
from src.vcs.commons import VCSException
from src.vcs.vcs import VCS
from src.vcs.vcs_neo4j_opt import convert_to_neo4j, commit_from_neo4j
from src.config import DataSet
from src.ngraph.ngraph import NGraphMeta
from src.graph.graph import ProvenanceGraphProxy
from src.graph.node import Node
from src.graph.edge import Edge
from collections import deque
class OutVO:
success: bool
data: dict
message: str
def __init__(self, success: bool, data=None, message=None):
self.success = success
self.data = data
self.message = message
def to_json(self):
return json.dumps({'success': self.success, 'data': self.data, 'message': self.message},
separators=(',', ':'), ensure_ascii=False)
class VCSClient:
vcs: VCS
username: str
neo_host: str
neo_username: str
neo_password: str
neo_home_import: str
neo_comment: str
def __init__(self, set_path: str, username: str = None,
neo_host=None, neo_username=None, neo_password=None, neo_home_import=None, neo_comment=None):
self.vcs = VCS(set_path)
self.username = username
self.neo_host = neo_host
self.neo_username = neo_username
self.neo_password = neo_password
self.neo_home_import = neo_home_import
self.neo_comment = neo_comment
def status(self):
return self.vcs.state()
def fetch(self):
self.vcs.fetch(username=self.username)
def pull(self):
self.vcs.pull(username=self.username)
def push(self):
self.vcs.push(username=self.username)
def neo_export(self):
if self.neo_home_import and "" != self.neo_home_import:
convert_to_neo4j(self.vcs, self.neo_host, self.neo_username, self.neo_password, self.neo_home_import)
else:
raise VCSException('Please set neo4j fast export path')
def neo_import(self):
if self.neo_home_import and "" != self.neo_home_import:
commit_from_neo4j(self.vcs, self.neo_host, self.neo_username, self.neo_password,
'commit from neo4j' if not self.neo_comment else self.neo_comment, self.neo_home_import)
else:
raise VCSException('Please set neo4j fast export path')
def source_info(self):
base_path = self.vcs.base_path
setname = os.path.basename(base_path)
result = {}
with open(os.sep.join([base_path, f"{setname}.data"]), 'rb') as f:
dataset: DataSet = pickle.load(f)
for hostname, host in dataset.machines.items():
result[hostname] = {}
for sourcename, _ in host.sources.items():
result[hostname][sourcename] = os.sep.join(
[base_path, hostname, sourcename, f"{setname}-{hostname}-{sourcename}.pkl"])
return result
def structure_to_vo(set_name: str, origin: dict) -> dict:
result = {
'value': set_name, 'label': set_name,
'children': []
}
for hostname, host in origin.items():
this_host = {
'value': hostname, 'label': hostname,
'children': []
}
for srcname, srcpath in host.items():
this_host['children'].append({
'label': srcname, 'value': srcpath
})
result['children'].append(this_host)
return result
def print_stdout(body: str):
sys.__stdout__.write(body + '\n')
sys.__stdout__.flush()
def track_graph(pgp: ProvenanceGraphProxy, _start_str, _back, _front, _max_next):
origin_back = _back
_id_list = [int(_id_str) for _id_str in _start_str.split(',')]
if len(_id_list) > 10:
raise VCSException("node id list length < 10")
new_node_set = set()
new_edge_set = set()
# BackTracking
back_iter = deque()
time_map = {}
graph_result = ProvenanceGraphProxy.create_temp_graph()
if _back > 0:
for _id in _id_list:
if _id in graph_result.nodes: # 去重
continue
if _id not in pgp.nodes:
raise VCSException(f"node id {_id} not exists")
start_node = pgp.find_node(_id)
back_iter.append(start_node)
graph_result._replica_add_node(start_node.id, start_node.name, start_node.time)
time_map[start_node.id] = float('inf')
elif _back < 0:
raise VCSException("back should >= 0")
while not len(back_iter) == 0 and _back > 0:
_back -= 1
back_iter_size = len(back_iter)
new_node_set = set()
new_edge_set = set()
curr_iter_new = set()
for _ in range(back_iter_size):
node: Node = back_iter.popleft()
node_new = graph_result.find_node(node.id)
# 剪枝
target_node_dict = {} # { node_id: (node, degree, [edge_id_list]) }
existed_node_dict = {} # { node_id: (node, degree, [edge_id_list]) }
for in_edge in node.in_edges.values():
# 排除 自环
if in_edge.start_id == in_edge.end_id:
continue
# 排除 visited_relation
if in_edge.id in node_new.in_edges:
continue
edge_time = 0 if in_edge.time is None else in_edge.time
if edge_time > time_map[node_new.id]:
continue
prior_node = pgp.find_node(in_edge.start_id)
# prior_node 是否已经存在于新图, 若已存在, 则该边一定加入新图
if prior_node.id in graph_result.nodes:
if prior_node.id not in existed_node_dict:
existed_node_dict[prior_node.id] = (
prior_node, len(prior_node.in_edges.keys()) + len(prior_node.out_edges.keys()),
[in_edge.id])
else:
existed_node_dict[prior_node.id][2].append(in_edge.id)
else:
if prior_node.id not in target_node_dict:
target_node_dict[prior_node.id] = (
prior_node, len(prior_node.in_edges.keys()) + len(prior_node.out_edges.keys()),
[in_edge.id])
else:
target_node_dict[prior_node.id][2].append(in_edge.id)
target_node_list = list(target_node_dict.values())
target_node_list.sort(key=lambda x: x[1], reverse=True)
if len(target_node_list) > _max_next:
target_node_list = target_node_list[:_max_next]
target_node_list = target_node_list + list(existed_node_dict.values())
for prior_node, _, edge_id_list in target_node_list:
if prior_node.id not in curr_iter_new:
back_iter.append(prior_node)
curr_iter_new.add(prior_node.id)
for edge_id in edge_id_list:
in_edge = prior_node.out_edges[edge_id]
edge_time = 0 if in_edge.time is None else in_edge.time
if prior_node.id not in graph_result.nodes:
new_node_set.add(prior_node.id)
prior_node_new = graph_result._replica_add_node(prior_node.id, prior_node.name, prior_node.time) \
if prior_node.id not in graph_result.nodes else graph_result.find_node(prior_node.id)
new_edge_set.add(in_edge.id)
graph_result._replica_add_edge(in_edge.id, in_edge.start_id, in_edge.end_id, in_edge.relation_type,
in_edge.time, in_edge.tsv_id)
time_map[prior_node_new.id] = edge_time if prior_node_new.id not in time_map \
else max(time_map[prior_node_new.id], edge_time)
graph_result.update_node(prior_node_new.id, prior_node_new)
# FrontTracking
front_iter = deque()
time_map = {} # 每个节点入边的最小时间
if origin_back == 0:
for _id in _id_list:
if _id in graph_result.nodes: # 去重
continue
if _id not in pgp.nodes:
raise VCSException(f"node id {_id} not exists")
start_node = pgp.find_node(_id)
front_iter.append(start_node)
graph_result._replica_add_node(start_node.id, start_node.name, start_node.time)
time_map[start_node.id] = float(0)
else:
for node in graph_result.nodes.values():
front_iter.append(pgp.find_node(node.id))
for in_edge in node.in_edges.values():
edge_time = 0 if in_edge.time is None else in_edge.time
time_map[node.id] = edge_time if node.id not in time_map else min(time_map[node.id], edge_time)
if node.id not in time_map:
time_map[node.id] = 0 # 没有入边的点,最小时间预初始化为0
while not len(front_iter) == 0 and _front > 0:
_front -= 1
front_iter_size = len(front_iter)
new_node_set = set()
new_edge_set = set()
curr_iter_new = set()
for _ in range(front_iter_size):
node_old: Node = front_iter.popleft()
node: Node = graph_result.find_node(node_old.id)
# 剪枝
target_node_dict = {} # { node_id: (node, degree, [edge_id_list]) }
existed_node_dict = {} # { node_id: (node, degree, [edge_id_list]) }
for out_edge in node_old.out_edges.values():
# 排除 自环
if out_edge.end_id == out_edge.start_id:
continue
# 排除 visited relation
if out_edge.id in node.out_edges:
continue
edge_time = 0 if out_edge.time is None else out_edge.time
if edge_time < time_map[node.id]:
continue
next_node = pgp.find_node(out_edge.end_id)
# next_node 是否已经存在于新图, 若已存在, 则该边一定加入新图
if next_node.id in graph_result.nodes:
if next_node.id not in existed_node_dict:
existed_node_dict[next_node.id] = (
next_node, len(next_node.in_edges.keys()) + len(next_node.out_edges.keys()),
[out_edge.id])
else:
existed_node_dict[next_node.id][2].append(out_edge.id)
else:
if next_node.id not in target_node_dict:
target_node_dict[next_node.id] = (
next_node, len(next_node.in_edges.keys()) + len(next_node.out_edges.keys()), [out_edge.id])
else:
target_node_dict[next_node.id][2].append(out_edge.id)
target_node_list = list(target_node_dict.values())
target_node_list.sort(key=lambda x: x[1], reverse=True)
if len(target_node_list) > _max_next:
target_node_list = target_node_list[:_max_next]
target_node_list = target_node_list + list(existed_node_dict.values())
for next_node, _, edge_id_list in target_node_list:
if next_node.id not in curr_iter_new:
front_iter.append(next_node)
curr_iter_new.add(next_node.id)
for edge_id in edge_id_list:
out_edge = next_node.in_edges[edge_id]
edge_time = 0 if out_edge.time is None else out_edge.time
if next_node.id not in graph_result.nodes:
new_node_set.add(next_node.id)
next_node_new = graph_result._replica_add_node(next_node.id, next_node.name, next_node.time) \
if next_node.id not in graph_result.nodes else graph_result.find_node(next_node.id)
graph_result._replica_add_edge(out_edge.id, out_edge.start_id, out_edge.end_id,
out_edge.relation_type, out_edge.time, out_edge.tsv_id)
new_edge_set.add(out_edge.id)
time_map[next_node_new.id] = edge_time if next_node_new.id not in time_map \
else min(time_map[next_node_new.id], edge_time)
graph_result.update_node(next_node_new.id, next_node_new)
return graph_result, new_node_set, new_edge_set
def track_vision(_target_pkl, _start_str, _back, _front, _max_next):
pgp = ProvenanceGraphProxy.load_subgraph(_target_pkl)
graph_result, new_node_set, new_edge_set = track_graph(pgp, _start_str, _back, _front, _max_next)
returned_nodes = []
returned_edges = []
# feat: 两点之间只保留两条边 (start,end): [min_edge, max_edge, tot]
edge_compress_dict = {}
def insert_edge(edge, label_prefix="", label_suffix=""):
curr_e = {
'id': 'e_' + str(edge.id),
'source': 'n_' + str(edge.start_id),
'target': 'n_' + str(edge.end_id),
'label': str(label_prefix + str(edge.relation_type) + label_suffix),
'g6_type': "normal",
}
if edge.id in new_edge_set:
curr_e['g6_type'] = "new"
returned_edges.append(curr_e)
for _nid, _n in graph_result.nodes.items():
_n: Node
curr_n = {
'id': 'n_' + str(_nid),
'label': str(_n.name),
'g6_type': "normal",
}
if _nid in new_node_set:
curr_n['g6_type'] = "new"
returned_nodes.append(curr_n)
for _oeid, _out_edge in _n.out_edges.items():
_out_edge: Edge
pair = (_out_edge.start_id, _out_edge.end_id)
if pair not in edge_compress_dict:
edge_compress_dict[pair] = [_out_edge, _out_edge, 0]
else:
edge_compress = edge_compress_dict[pair]
this_time = _out_edge.time if _out_edge.time is not None else 0
cur_min_time = edge_compress[0].time if edge_compress[0].time is not None else 0
cur_max_time = edge_compress[1].time if edge_compress[1].time is not None else 0
if this_time < cur_min_time:
edge_compress_dict[pair][0] = _out_edge
elif this_time >= cur_max_time:
edge_compress_dict[pair][1] = _out_edge
edge_compress_dict[pair][2] += 1
for _, edge_compress in edge_compress_dict.items():
if edge_compress[2] == 1:
insert_edge(edge_compress[0])
else:
insert_edge(edge_compress[0], label_suffix="#" + str(edge_compress[2]))
insert_edge(edge_compress[1])
# 不显示图
return {
'nodes': returned_nodes,
'edges': returned_edges
}
def track_surrounding_vision_dfs(node: Node, start_id: str,
pgp: ProvenanceGraphProxy, graph_result: ProvenanceGraphProxy,
pgp_memo: ProvenanceGraphProxy,
_max: int, step_left: int) -> bool:
if node.id in pgp_memo.nodes:
# 已经搜索过
return True
found = False
next_search_degree = {}
for out_edge in node.out_edges.values():
out_edge: Edge
nn: Node = pgp.find_node(out_edge.end_id)
if out_edge.end_id != node.id and nn.id != start_id:
# 规避(自)环
if nn.id not in graph_result.nodes:
# 继续搜索的节点
out_degree = len(nn.out_edges)
if out_degree not in next_search_degree:
next_search_degree[out_degree] = [(nn, out_edge)]
else:
next_search_degree[out_degree].append((nn, out_edge))
elif node.id != start_id:
# 不考虑起点与其他目标节点的边
# 发现目标节点,加入到目标节点的子路径
if not found:
found = True
if node.id not in pgp_memo.nodes:
pgp_memo._replica_add_node(
node.id, node.name, node.time)
if nn.id not in pgp_memo.nodes:
pgp_memo._replica_add_node(nn.id, nn.name, nn.time)
pgp_memo._replica_add_edge(
out_edge.id, out_edge.start_id, out_edge.end_id, out_edge.relation_type, out_edge.time,
out_edge.tsv_id)
if step_left > 1:
# 否则步数已经用完,停止搜索
next_search = []
degrees = sorted(next_search_degree.keys(), reverse=True)
# 按照度数排序并剪枝
for degree in degrees:
if len(next_search) >= _max:
break
for _nn, _oe in next_search_degree[degree]:
if len(next_search) >= _max:
break
next_search.append((_nn, _oe))
for nn, out_edge in next_search:
# 向下搜索
child_found = track_surrounding_vision_dfs(
nn, start_id, pgp, graph_result, pgp_memo, _max, step_left - 1)
if child_found:
# 子节点可以到达目标节点,连接自己和子节点
if not found:
found = True
if node.id not in pgp_memo.nodes:
pgp_memo._replica_add_node(node.id, node.name, node.time)
pgp_memo._replica_add_edge(
out_edge.id, out_edge.start_id, out_edge.end_id, out_edge.relation_type, out_edge.time,
out_edge.tsv_id)
return found
def track_surrounding_vision(_target_pkl, _start_str, _back, _front, _max):
k = _back + _front
aid_node_set = set()
aid_edge_set = set()
pgp = ProvenanceGraphProxy.load_subgraph(_target_pkl)
graph_result, new_node_set, new_edge_set = track_graph(
pgp, _start_str, _back, _front, _max)
pgp_memo: ProvenanceGraphProxy = ProvenanceGraphProxy.create_temp_graph()
for _nid, _n in graph_result.nodes.items():
track_surrounding_vision_dfs(
pgp.find_node(_nid), _nid, pgp, graph_result, pgp_memo, _max, k)
for node_id, node in pgp_memo.nodes.items():
if node_id not in graph_result.nodes:
aid_node_set.add(node_id)
graph_result._replica_add_node(node_id, node.name, node.time)
for edge_id, node_pair in pgp_memo.temp_edge_map.items():
aid_edge_set.add(edge_id)
edge = pgp_memo.find_node(node_pair[0]).out_edges[edge_id]
graph_result._replica_add_edge(
edge_id, node_pair[0], node_pair[1], edge.relation_type, edge.time, edge.tsv_id)
returned_nodes = []
returned_edges = []
# feat: 两点之间只保留两条边 (start,end): [min_edge, max_edge, tot]
edge_compress_dict = {}
def insert_edge(edge, label_prefix="", label_suffix=""):
curr_e = {
'id': 'e_' + str(edge.id),
'source': 'n_' + str(edge.start_id),
'target': 'n_' + str(edge.end_id),
'label': str(label_prefix + str(edge.relation_type) + label_suffix),
'g6_type': "normal",
}
if edge.id in new_edge_set:
curr_e['g6_type'] = "new"
elif edge.id in aid_edge_set:
curr_e['g6_type'] = "aided"
returned_edges.append(curr_e)
for _nid, _n in graph_result.nodes.items():
_n: Node
curr_n = {
'id': 'n_' + str(_nid),
'label': str(_n.name),
'g6_type': "normal",
}
if _nid in new_node_set:
curr_n['g6_type'] = "new"
elif _nid in aid_node_set:
curr_n["g6_type"] = "aided"
returned_nodes.append(curr_n)
for _out_edge in _n.out_edges.values():
_out_edge: Edge
pair = (_out_edge.start_id, _out_edge.end_id)
if pair not in edge_compress_dict:
edge_compress_dict[pair] = [_out_edge, _out_edge, 0]
else:
edge_compress = edge_compress_dict[pair]
this_time = _out_edge.time if _out_edge.time is not None else 0
cur_min_time = edge_compress[0].time if edge_compress[0].time is not None else 0
cur_max_time = edge_compress[1].time if edge_compress[1].time is not None else 0
if this_time < cur_min_time:
edge_compress_dict[pair][0] = _out_edge
elif this_time >= cur_max_time:
edge_compress_dict[pair][1] = _out_edge
edge_compress_dict[pair][2] += 1
for edge_compress in edge_compress_dict.values():
if edge_compress[2] == 1:
insert_edge(edge_compress[0])
else:
insert_edge(edge_compress[0], label_suffix="#" + str(edge_compress[2]))
insert_edge(edge_compress[1])
return {
'nodes': returned_nodes,
'edges': returned_edges
}
if __name__ == '__main__':
try:
# log_ff = open("/Users/jinyuzhu/Desktop/provdb.log", 'w+')
# sys.stdout = log_ff
sys.stdout = open(os.devnull, 'w')
command = sys.argv[1]
if command == 'vcs_status':
# python vcs_status target_dir
ret = {
'status': 'not_cloned'
}
try:
target_dir = sys.argv[2]
vcs: VCS = VCS(target_dir)
if vcs._is_cloned():
ret = {
'status': 'cloned',
'current_version': vcs._get_current_version(),
'versions': vcs._get_versions_vo(),
'remote_version': vcs._get_branch_version('remotes', vcs._get_current_branch().split('/')[2]),
'structure': structure_to_vo(vcs.dataset_name, vcs._get_current_structure())
}
except VCSException as e:
pass
except Exception as ee:
pass
finally:
print_stdout(OutVO(success=True, data=ret, message='success').to_json())
elif command == 'vcs_client':
# python vcs_client OPERATION set_path username neo_host neo_username neo_password
op = sys.argv[2]
args = sys.argv[3:]
vcs_client = VCSClient(*args)
ret = eval('vcs_client.{}()'.format(op))
print_stdout(OutVO(success=True, data=ret, message='success').to_json())
elif command == 'vcs_clone':
# python vcs_clone remote_url target_dir username
args = sys.argv[2:]
vcs: VCS = VCS.clone(*args)
print_stdout(OutVO(success=True, data=None, message='success').to_json())
elif command == 'vcs_commit':
# python vcs_commit target_dir msg
target_dir = sys.argv[2]
msg = sys.argv[3]
vcs: VCS = VCS(target_dir)
vcs.commit(msg)
print_stdout(OutVO(success=True, data=None, message='success').to_json())
elif command == 'source_fuzzy_search':
# python source_fuzzy_search target_pkl query
target_pkl = sys.argv[2]
query = sys.argv[3]
with open(target_pkl, 'rb') as f:
ngraph_meta: NGraphMeta = pickle.load(f)
res = ngraph_meta.search_name_fuzzy(query)
print_stdout(OutVO(success=True, data=res, message="success").to_json())
elif command == 'graph_std_track':
# python graph_std_track target_pkl start_str back front max_next
target_pkl = sys.argv[2]
start_str = sys.argv[3]
back = int(sys.argv[4])
front = int(sys.argv[5])
max_next = int(sys.argv[6])
res = track_vision(target_pkl, start_str, back, front, max_next)
print_stdout(OutVO(success=True, data=res, message='success').to_json())
elif command == 'graph_std_track_surround':
# python graph_std_track_surround target_pkl start_str back front surround_max
target_pkl = sys.argv[2]
start_str = sys.argv[3]
back = int(sys.argv[4])
front = int(sys.argv[5])
surround_max = int(sys.argv[6])
res = track_surrounding_vision(target_pkl, start_str, back, front, surround_max)
print_stdout(OutVO(success=True, data=res, message='success').to_json())
else:
raise VCSException('no such client operation: {}'.format(str(sys.argv)))
except VCSException as e:
print_stdout(OutVO(success=False, message=e.msg).to_json())
# except Exception as e:
# print_stdout(OutVO(success=False, message='internal error ' + str(e)).to_json())
finally:
sys.stdout.close()
sys.stdout = sys.__stdout__
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化