代码拉取完成,页面将自动刷新
import csv
import json
import os.path
import signal
import sys
import graph_core
from kafka import KafkaConsumer
class ProcessType:
infoing = "infoing"
collecting = "collecting"
def exit_while_none_input(i):
if i is None:
print("用户退出应用")
sys.exit(1)
return i
process_status = ""
kafka_addrs = []
kafka_topics = []
tags = list()
current_log_num = 1
save_tsv_path = ""
save_index_path = ""
terminate = False
def parse_row(origin_data: dict):
res = []
for _tag in tags:
if _tag not in origin_data:
res.append("")
else:
res.append(str(origin_data[_tag]))
return res
def signal_handling(v1, v2):
global terminate
terminate = True
def check_terminate_and_break():
global terminate
global current_log_num
if terminate:
print("Keyboard Interrupt, 保存图索引文件...")
graph.Save(save_index_path)
print(f"Keyboard Interrupt, 已保存到{save_index_path}, 解析日志{str(current_log_num - 1)}条")
sys.exit(0)
if __name__ == '__main__':
process_status = ProcessType.infoing
kafka_addrs = ["localhost:29092"]
kafka_topics = ["test_topic"]
tags = ["dst_network_type", "dst_ip_type", "dst_ip", "timestamp",
"dev_name", "dev_info", "dst_network_area", "dst_segment_info",
"dst_data_center", "host"]
save_tsv_path = "./graph.tsv"
save_index_path = "./graph.dat"
# query_start_ans = prompt(query_start, style=custom_style_2)
# kafka_addrs = list(exit_while_none_input(query_start_ans.get("KAFKA_SERVER")).split(","))
# kafka_topics = list(exit_while_none_input(query_start_ans.get("KAFKA_TOPIC")).split(","))
# tags = list(exit_while_none_input(query_start_ans.get("TAGS_STRING")).split(","))
# save_tsv_path = exit_while_none_input(query_start_ans.get("TSV_PATH"))
# save_index_path = exit_while_none_input(query_start_ans.get("INDEX_PATH"))
if os.path.exists(save_index_path) or os.path.exists(save_tsv_path):
print(f"TSV或索引文件({save_tsv_path}/{save_index_path})已存在,请先删除或重新指定")
sys.exit(1)
tsv_file = open(save_tsv_path, 'w', newline='', encoding='utf-8')
tsv_writer = csv.writer(tsv_file, delimiter='\t')
tsv_writer.writerow(tags)
tsv_filename = os.path.basename(save_tsv_path)
tags_vec = graph_core.vectorString()
for tag in tags:
tags_vec.push_back(tag)
graph: graph_core.PUndirectedGraph = graph_core.UndirectedGraph.New(tags_vec)
print(f"连接Kafka服务器 {','.join(kafka_addrs)}, 订阅 {','.join(kafka_topics)} ... ")
kafka_consumer = KafkaConsumer(bootstrap_servers=kafka_addrs,
auto_offset_reset='earliest',
enable_auto_commit=True, group_id='provdb',
value_deserializer=lambda x: x.decode('utf-8'))
kafka_consumer.subscribe(kafka_topics)
try:
# re_set signal handler
original_sinint = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal_handling)
while True:
polled = kafka_consumer.poll(timeout_ms=5)
for k, msgs in polled.items():
for msg in msgs:
print(f"Got {msg}")
entry = msg.value
entry_val = None
try:
entry_val = json.loads(entry)
except json.JSONDecodeError as e:
print(f"无法解析:{entry}")
check_terminate_and_break()
continue
graph.AddLogNode(tsv_filename, current_log_num, 0)
tag_vals = parse_row(entry_val)
for i in range(len(tags)):
k = str(tags[i])
v = str(tag_vals[i])
graph.AddTagNode(k, v, 0)
graph.AddEdge(tsv_filename, current_log_num, k, v)
tsv_writer.writerow(tag_vals)
current_log_num += 1
check_terminate_and_break()
finally:
kafka_consumer.close()
tsv_file.flush()
tsv_file.close()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。