From 8e4a925f312b7626f810d6e30e4dcc7ecce4a98b Mon Sep 17 00:00:00 2001 From: boluo1 <11274284+boluo56@user.noreply.gitee.com> Date: Thu, 26 Sep 2024 13:40:28 +0000 Subject: [PATCH 1/2] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=8A=9F=E8=83=BD=20?= =?UTF-8?q?=EF=BC=9Asentryctl=20get=5Falarm=20=20[-s=20]=20[-d]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: boluo1 <11274284+boluo56@user.noreply.gitee.com> --- sysSentry-1.0.2/src/python/syssentry/alarm.py | 171 ++++++++++++++++ .../src/python/syssentry/callbacks.py | 13 ++ .../src/python/syssentry/cpu_sentry.py | 36 ++-- .../src/python/syssentry/global_values.py | 3 + .../src/python/syssentry/load_mods.py | 9 + .../src/python/syssentry/sentryctl | 20 +- .../src/python/syssentry/syssentry.py | 13 +- .../src/python/syssentry/task_map.py | 7 +- .../src/python/xalarm/register_xalarm.py | 187 ++++++++++++++++++ .../src/python/xalarm/sentry_notify.py | 71 +++++++ .../src/python/xalarm/xalarm_api.py | 19 +- .../src/python/xalarm/xalarm_server.py | 41 +++- .../src/python/xalarm/xalarm_transfer.py | 4 +- xalarm_test/xalarm_sen.py | 7 + 14 files changed, 561 insertions(+), 40 deletions(-) create mode 100644 sysSentry-1.0.2/src/python/syssentry/alarm.py create mode 100644 sysSentry-1.0.2/src/python/xalarm/register_xalarm.py create mode 100644 sysSentry-1.0.2/src/python/xalarm/sentry_notify.py create mode 100644 xalarm_test/xalarm_sen.py diff --git a/sysSentry-1.0.2/src/python/syssentry/alarm.py b/sysSentry-1.0.2/src/python/syssentry/alarm.py new file mode 100644 index 0000000..1bbf68e --- /dev/null +++ b/sysSentry-1.0.2/src/python/syssentry/alarm.py @@ -0,0 +1,171 @@ +# coding: utf-8 +# Copyright (c) 2024 Huawei Technologies Co., Ltd. +# sysSentry is licensed under the Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +# PURPOSE. +# See the Mulan PSL v2 for more details. + +""" +use for report alarm +""" +import threading +from typing import Dict, List +from datetime import datetime +import time +import logging +import json + +# from xalarm import xalarm_register, Xalarm +from xalarm.register_xalarm import xalarm_register +from xalarm.xalarm_api import Xalarm +from xalarm.register_xalarm import * + +from .global_values import InspectTask +from .task_map import TasksMap + +# 告警ID映射字典,key为插件名,value为告警ID(类型为数字) +task_alarm_id_dict: Dict[str, int] = {} + +# 告警老化时间字典,key为告警ID,value为老化时间(类型为数字,单位为秒) +alarm_id_clear_time_dict: Dict[int, int] = {} + +# 告警事件列表,key为告警ID,value为告警ID对应的告警事件列表(类型为list) +alarm_list_dict: Dict[int, List[Xalarm]] = {} +# 告警事件列表锁 +alarm_list_lock = threading.Lock() + +id_filter = [] +id_base = 1001 + +TIME_UNIT_MILLISECONDS = 1000 + +def xalarm_gettime(alarm_info: Xalarm) -> int: + if not alarm_info: + return 0 + return alarm_info.timetamp.tv_sec * TIME_UNIT_MILLISECONDS + alarm_info.timetamp.tv_usec / TIME_UNIT_MILLISECONDS + +def update_alarm_list(alarm_info: Xalarm): + logging.info(f"进入 update_alarm_list() 中") + alarm_id = alarm_info.alarm_id + timestamp = xalarm_gettime(alarm_info) + alarm_list_lock.acquire() + try: + # new alarm is inserted into list head + if alarm_id not in alarm_list_dict: + logging.warn(f"update_alarm_list: alarm_id {alarm_id} not found in alarm_list_dict") + return + alarm_list = alarm_list_dict[alarm_id] + + logging.info(f"alarm_list_dict中的内容为: {alarm_list_dict}") + logging.info(f"在update_alarm_list()中 alarm_list={alarm_list}") + + alarm_list.insert(0, alarm_info) + # clear alarm_info older than clear time threshold + clear_index = -1 + clear_time = alarm_id_clear_time_dict[alarm_id] + for i in range(len(alarm_list)): + if timestamp - xalarm_gettime(alarm_list[i]) > clear_time: + clear_index = i + break + if clear_index >= 0: + alarm_list_dict[alarm_id] = alarm_list[:clear_index] + finally: + alarm_list_lock.release() + + + +def alarm_register(): + logging.info(f"alarm_register: enter") + # 初始化告警ID映射字典、告警老化时间字典 + for task_type in TasksMap.tasks_dict: + for task_name in TasksMap.tasks_dict[task_type]: + logging.info(f"alarm_register: {task_name} is registered") + task = TasksMap.tasks_dict[task_type][task_name] + alarm_id = task.alarm_id + alarm_clear_time = task.alarm_clear_time + alarm_list_dict[alarm_id] = [] + task_alarm_id_dict[task_name] = alarm_id + if alarm_id not in alarm_id_clear_time_dict: + alarm_id_clear_time_dict[alarm_id] = alarm_clear_time + else: + alarm_id_clear_time_dict[alarm_id] = max(alarm_clear_time, alarm_id_clear_time_dict[alarm_id]) + # 注册告警回调 + id_filter = [True] * 128 + # for id in list(set(task_alarm_id_dict.values())): + # logging.info(f"alarm_register: {id} is filtered") + # id_filter[id - id_base] = True + ret = xalarm_register(update_alarm_list, id_filter) + if ret < 0: + logging.info(f'register注册失败') + return + logging.info(f'ret是注册结果:{ret}') + logging.info('register 注册成功') + + +def get_alarm_result(task_name: str, time_range: int, detailed: bool) -> List[Dict]: + alarm_list_lock.acquire() + try: + if task_name not in task_alarm_id_dict: + return [] + alarm_id = task_alarm_id_dict[task_name] + if alarm_id not in alarm_list_dict: + return [] + alarm_list = alarm_list_dict[alarm_id] + + + logging.info(f"alarm_list_dict {alarm_list_dict}") + + + logging.info(f"get_alarm_result: alarm_list of {alarm_id} has {len(alarm_list)} elements") + # clear alarm_info older than clear time threshold + stop_index = -1 + timestamp = int(datetime.now().timestamp()) + for i in range(len(alarm_list)): + logging.info(f"timestamp, alarm_list[{i}].timestamp: {timestamp}, {xalarm_gettime(alarm_list[i])}") + if timestamp - xalarm_gettime(alarm_list[i]) > int(time_range): + stop_index = i + break + if stop_index >= 0: + alarm_list = alarm_list[:stop_index] + # filter alarms whose source does not match + + logging.info(f"进入 get_alarm_result() 中,筛选出alarm_list中符合条件的alarm对象") + + # alarm_list = [alarm.__dict__ for alarm in alarm_list if not alarm.alarm_source or alarm.alarm_source == task_name] + logging.info(f"get_alarm_result: final alarm_list of {alarm_id} has {len(alarm_list)} elements") + # keep detail + + def xalarm_to_dict(alarm_info: Xalarm) -> dict: + return { + 'alarm_id': alarm_info.alarm_id, + 'alarm_type': alarm_info.alarm_type, + 'alarm_level': alarm_info.alarm_level, + 'timetamp': xalarm_gettime(alarm_info), + 'msg1': alarm_info.msg1.decode('utf-8').rstrip('\x00') + } + + alarm_list = [xalarm_to_dict(alarm) for alarm in alarm_list] + + # alarm_list = [alarm.__dict__ for alarm in alarm_list if not alarm.alarm_source or alarm.alarm_source == task_name] + + for alarm in alarm_list: + # alarm.pop('detail', None) + logging.info(f"alarm: {alarm}") + alarm_info = alarm['msg1'] + alarm_info = json.loads(alarm_info) + if not detailed: + if 'details' in alarm_info: + alarm_info.pop('details', None) + alarm.pop('msg1', None) + alarm['alarm_info'] = alarm_info + + # if not detailed: + # for alarm in alarm_list: + # alarm.pop('detail', None) + return alarm_list + finally: + alarm_list_lock.release() \ No newline at end of file diff --git a/sysSentry-1.0.2/src/python/syssentry/callbacks.py b/sysSentry-1.0.2/src/python/syssentry/callbacks.py index b38b381..534bba2 100644 --- a/sysSentry-1.0.2/src/python/syssentry/callbacks.py +++ b/sysSentry-1.0.2/src/python/syssentry/callbacks.py @@ -18,6 +18,7 @@ import logging from .task_map import TasksMap, ONESHOT_TYPE, PERIOD_TYPE from .mod_status import EXITED_STATUS, RUNNING_STATUS, WAITING_STATUS, set_runtime_status +from .alarm import get_alarm_result def task_get_status(mod_name): @@ -41,6 +42,18 @@ def task_get_result(mod_name): return "success", task.get_result() +def task_get_alarm(data): + """get result by mod name""" + task_name = data['task_name'] + time_range = data['time_range'] + detailed = data['detailed'] + task = TasksMap.get_task_by_name(task_name) + if not task: + return "failed", f"cannot find task by name {task_name}" + if not task.load_enabled: + return "failed", f"mod {task_name} is not enabled" + + return "success", get_alarm_result(task_name, time_range, detailed) def task_stop(mod_name): """stop by mod name""" diff --git a/sysSentry-1.0.2/src/python/syssentry/cpu_sentry.py b/sysSentry-1.0.2/src/python/syssentry/cpu_sentry.py index 2f18d14..72925eb 100644 --- a/sysSentry-1.0.2/src/python/syssentry/cpu_sentry.py +++ b/sysSentry-1.0.2/src/python/syssentry/cpu_sentry.py @@ -26,8 +26,6 @@ CPU_SENTRY_PARAM_CONFIG = "/etc/sysSentry/plugins/cpu_sentry.ini" # Inspection commands running at the bottom layer LOW_LEVEL_INSPECT_CMD = "cat-cli" -# max length of msg in details -DETAILS_LOG_MSG_MAX_LEN = 255 class CpuSentry: """ @@ -96,10 +94,22 @@ class CpuSentry: self.send_result["details"]["msg"] = "cpu_sentry task is killed!" return + if "ERROR" in stdout: + self.send_result["result"] = ResultLevel.FAIL + self.send_result["details"]["code"] = 1004 + + # Remove ANSI escape sequences + error_info = stdout.split("\n")[0] + if error_info.startswith("\u001b"): + ansi_escape = r'\x1b\[([0-9]+)(;[0-9]+)*([A-Za-z])' + error_info = re.sub(ansi_escape, '', error_info) + + self.send_result["details"]["msg"] = error_info + return + out_split = stdout.split("\n") - isolated_cores_number = -1 + isolated_cores_number = 0 found_fault_cores_list = [] - error_msg_list = [] for out_line_i in out_split: if "handle_patrol_result: Found fault cores" in out_line_i: cores_number_tmp = out_line_i.split("Found fault cores:")[1] @@ -111,25 +121,9 @@ class CpuSentry: elif out_line_i.startswith(''): self.send_result["details"]["isolated_cpu_list"] = out_line_i.split(':')[1] break - elif "ERROR" in out_line_i: - logging.error("[cat-cli error] - %s\n", out_line_i) - error_msg_list.append(out_line_i) found_fault_cores_number = len(set(found_fault_cores_list)) - if isolated_cores_number == -1: - self.send_result["result"] = ResultLevel.FAIL - self.send_result["details"]["code"] = 1004 - - send_error_msg = "" - # Remove ANSI escape sequences - for error_info in error_msg_list: - if error_info.startswith("\u001b"): - ansi_escape = r'\x1b\[([0-9]+)(;[0-9]+)*([A-Za-z])' - error_info = re.sub(ansi_escape, '', error_info) - if len(send_error_msg) + len(error_info) < DETAILS_LOG_MSG_MAX_LEN: - send_error_msg += ";" + error_info - self.send_result["details"]["msg"] = send_error_msg - elif found_fault_cores_number == 0: + if found_fault_cores_number == 0: self.send_result["details"]["code"] = 0 self.send_result["result"] = ResultLevel.PASS elif 0 in found_fault_cores_list: diff --git a/sysSentry-1.0.2/src/python/syssentry/global_values.py b/sysSentry-1.0.2/src/python/syssentry/global_values.py index 483d544..7a02010 100644 --- a/sysSentry-1.0.2/src/python/syssentry/global_values.py +++ b/sysSentry-1.0.2/src/python/syssentry/global_values.py @@ -76,6 +76,9 @@ class InspectTask: self.env_file = "" # start mode self.conflict = "up" + # alarm id + self.alarm_id = -1 + self.alarm_clear_time = 1800 def start(self): """ diff --git a/sysSentry-1.0.2/src/python/syssentry/load_mods.py b/sysSentry-1.0.2/src/python/syssentry/load_mods.py index 48d7e66..f7b5aa9 100644 --- a/sysSentry-1.0.2/src/python/syssentry/load_mods.py +++ b/sysSentry-1.0.2/src/python/syssentry/load_mods.py @@ -41,6 +41,8 @@ CONF_TASK_RESTART = 'task_restart' CONF_ONSTART = 'onstart' CONF_ENV_FILE = 'env_file' CONF_CONFLICT = 'conflict' +CONF_ALARM_ID = 'alarm_id' +CONF_ALARM_CLEAR_TIME = 'alarm_clear_time' MOD_FILE_SUFFIX = '.mod' MOD_SUFFIX_LEN = 4 @@ -194,6 +196,13 @@ def parse_mod_conf(mod_name, mod_conf): task.heartbeat_interval = heartbeat_interval task.load_enabled = is_enabled + try: + task.alarm_id = int(mod_conf.get(CONF_TASK, CONF_ALARM_ID)) + task.alarm_clear_time = int(mod_conf.get(CONF_TASK, CONF_ALARM_CLEAR_TIME)) + except: + task.alarm_id = 1001 + task.alarm_clear_time = 15 + if CONF_ONSTART in mod_conf.options(CONF_TASK): is_onstart = (mod_conf.get(CONF_TASK, CONF_ONSTART) == 'yes') if task_type == PERIOD_CONF: diff --git a/sysSentry-1.0.2/src/python/syssentry/sentryctl b/sysSentry-1.0.2/src/python/syssentry/sentryctl index e94491f..42fd7ef 100644 --- a/sysSentry-1.0.2/src/python/syssentry/sentryctl +++ b/sysSentry-1.0.2/src/python/syssentry/sentryctl @@ -25,6 +25,7 @@ MAX_PARAM_LENGTH = 256 RESULT_MSG_DATA_LEN = 4 CTL_MSG_LEN_LEN = 3 +DEFAULT_ALARM_TIME_RANGE = 10 def status_output_format(res_data): """format output""" @@ -57,6 +58,8 @@ def res_output_handle(res_struct, req_type): status_output_format(res_struct['data']) elif req_type == 'get_result': result_output_format(res_struct['data']) + elif req_type == 'get_alarm': + result_output_format(res_struct['data']) elif res_struct['ret'] == "failed": print(res_struct['data']) @@ -75,6 +78,7 @@ def client_send_and_recv(request_data, data_str_len): print("sentryctl: client creat socket error") return None + # connect to syssentry try: client_socket.connect(CTL_SOCKET_PATH) except OSError: @@ -82,6 +86,7 @@ def client_send_and_recv(request_data, data_str_len): print("sentryctl: client connect error") return None + # msg: CTL{len}{data} req_data_len = len(request_data) request_msg = "CTL" + str(req_data_len).zfill(3) + request_data @@ -94,8 +99,8 @@ def client_send_and_recv(request_data, data_str_len): print("sentryctl: client communicate error") return None + # res: RES{len}{data} res_magic = res_data[:3] - if res_magic != "RES": print("res msg format error") return None @@ -128,6 +133,10 @@ if __name__ == '__main__': parser_status.add_argument('task_name') parser_get_result = subparsers.add_parser('get_result', help='get task result') parser_get_result.add_argument('task_name') + parser_get_alarm = subparsers.add_parser('get_alarm', help='get task alarm') + parser_get_alarm.add_argument('task_name') + parser_get_alarm.add_argument('-s', '--time_range', type=str, default=DEFAULT_ALARM_TIME_RANGE, help='指定时间范围') + parser_get_alarm.add_argument('-d', '--detailed', action='store_true', help='打印详细信息') parser_list = subparsers.add_parser('list', help='show all loaded task mod') client_args = parser.parse_args() @@ -142,6 +151,15 @@ if __name__ == '__main__': req_msg_struct = {"type": "get_status", "data": client_args.task_name} elif client_args.cmd_type == 'get_result': req_msg_struct = {"type": "get_result", "data": client_args.task_name} + elif client_args.cmd_type == 'get_alarm': + req_msg_struct = { + "type": "get_alarm", + "data": { + 'task_name': client_args.task_name, + 'time_range': client_args.time_range, + 'detailed': client_args.detailed, + } + } elif client_args.cmd_type == 'reload': req_msg_struct = {"type": "reload", "data": client_args.task_name} else: diff --git a/sysSentry-1.0.2/src/python/syssentry/syssentry.py b/sysSentry-1.0.2/src/python/syssentry/syssentry.py index 776971f..204cf8c 100644 --- a/sysSentry-1.0.2/src/python/syssentry/syssentry.py +++ b/sysSentry-1.0.2/src/python/syssentry/syssentry.py @@ -28,7 +28,7 @@ from .sentry_config import SentryConfig from .task_map import TasksMap from .global_values import SENTRY_RUN_DIR, CTL_SOCKET_PATH, SENTRY_RUN_DIR_PERM from .cron_process import period_tasks_handle -from .callbacks import mod_list_show, task_start, task_get_status, task_stop, task_get_result +from .callbacks import mod_list_show, task_start, task_get_status, task_stop, task_get_result, task_get_alarm from .mod_status import get_task_by_pid, set_runtime_status from .load_mods import load_tasks, reload_single_mod from .heartbeat import (heartbeat_timeout_chk, heartbeat_fd_create, @@ -36,6 +36,7 @@ from .heartbeat import (heartbeat_timeout_chk, heartbeat_fd_create, from .result import RESULT_MSG_HEAD_LEN, RESULT_MSG_MAGIC_LEN, RESULT_MAGIC from .result import RESULT_LEVEL_ERR_MSG_DICT, ResultLevel from .utils import get_current_time_string +from .alarm import alarm_register CPU_EXIST = True @@ -62,6 +63,7 @@ type_func = { 'stop': task_stop, 'get_status': task_get_status, 'get_result': task_get_result, + 'get_alarm': task_get_alarm, 'reload': reload_single_mod } @@ -107,11 +109,12 @@ def msg_data_process(msg_data): return "Invaild cmd type" cmd_param = data_struct['data'] - logging.debug("msg_data_process cmd_type:%s cmd_param:%s", cmd_type, cmd_param) + logging.info("msg_data_process cmd_type:%s cmd_param:%s", cmd_type, str(cmd_param)) if cmd_type in type_func: ret, res_data = type_func[cmd_type](cmd_param) else: ret, res_data = type_func_void[cmd_type]() + logging.info("msg_data_process res_data:%s",str(res_data)) res_msg_struct = {"ret": ret, "data": res_data} res_msg = json.dumps(res_msg_struct) @@ -256,7 +259,7 @@ def server_recv(server_socket: socket.socket): logging.debug("res head %s", res_head) res_msg = res_head + res_data - logging.debug("res msg %s", res_msg) + logging.info("res msg %s", res_msg) try: client_socket.send(res_msg.encode()) @@ -581,10 +584,10 @@ def main(): _ = SentryConfig.init_param() TasksMap.init_task_map() load_tasks() + alarm_register() main_loop() except Exception: logging.error('%s', traceback.format_exc()) finally: - release_pidfile() - + release_pidfile() \ No newline at end of file diff --git a/sysSentry-1.0.2/src/python/syssentry/task_map.py b/sysSentry-1.0.2/src/python/syssentry/task_map.py index 70aa19d..6f96377 100644 --- a/sysSentry-1.0.2/src/python/syssentry/task_map.py +++ b/sysSentry-1.0.2/src/python/syssentry/task_map.py @@ -13,6 +13,9 @@ tasks map class and initialize function. """ import logging +from typing import Dict + +# from .global_values import InspectTask ONESHOT_TYPE = "ONESHOT" PERIOD_TYPE = "PERIOD" @@ -22,7 +25,7 @@ TASKS_MAP = None class TasksMap: """task map class""" - tasks_dict = {} + tasks_dict: Dict[str, Dict] = {} @classmethod def init_task_map(cls): @@ -64,4 +67,4 @@ class TasksMap: res = cls.tasks_dict.get(task_type).get(task_name) logging.debug("getting task by name: %s", res) break - return res + return res \ No newline at end of file diff --git a/sysSentry-1.0.2/src/python/xalarm/register_xalarm.py b/sysSentry-1.0.2/src/python/xalarm/register_xalarm.py new file mode 100644 index 0000000..5121bf3 --- /dev/null +++ b/sysSentry-1.0.2/src/python/xalarm/register_xalarm.py @@ -0,0 +1,187 @@ +import os +import sys +import socket +import logging +import threading +import time +import fcntl +import inspect +from struct import error as StructParseError + +from .xalarm_api import Xalarm, alarm_bin2stu + + +ALARM_REPORT_LEN = 1048 +MAX_NUM_OF_ALARM_ID=128 +MIN_ALARM_ID = 1001 +MAX_ALARM_ID = (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1) +DIR_XALARM = "/var/run/xalarm" +PATH_REG_ALARM = "/var/run/xalarm/alarm" +PATH_REPORT_ALARM = "/var/run/xalarm/report" +ALARM_DIR_PERMISSION = 0o0750 +ALARM_REG_SOCK_PERMISSION = 0o0700 +ALARM_SOCKET_PERMISSION = 0o0700 +TIME_UNIT_MILLISECONDS = 1000 +ALARM_REGISTER_INFO = None + + +class AlarmRegister: + def __init__(self, id_filter: list[bool], callback: callable): + self.id_filter = id_filter + self.callback = callback + self.socket = self.create_unix_socket() + self.is_registered = True + self.thread = threading.Thread(target=self.alarm_recv) + self.thread_should_stop = False + + def check_params(self) -> bool: + if (len(self.id_filter) != MAX_NUM_OF_ALARM_ID): + sys.stderr.write("check_params: invalid param id_filter\n") + return False + + sig = inspect.signature(self.callback) + if len(sig.parameters) != 1: + sys.stderr.write("check_params: invalid param callback\n") + return False + + if self.socket is None: + sys.stderr.write("check_params: scoket create failed\n") + return False + return True + + def set_id_filter(self, id_filter: list[bool]) -> bool: + if (len(id_filter) > MAX_NUM_OF_ALARM_ID): + sys.stderr.write("set_id_filter: invalid param id_filter\n") + return False + self.id_filter = id_filter + + def id_is_registered(self, alarm_id) -> bool: + if alarm_id < MIN_ALARM_ID or alarm_id > MAX_ALARM_ID: + return False + return self.id_filter[alarm_id - MIN_ALARM_ID] + + def put_alarm_info(self, alarm_info: Xalarm) -> None: + if not self.callback or not alarm_info: + return + if not self.id_is_registered(alarm_info.alarm_id): + return + self.callback(alarm_info) + + def create_unix_socket(self) -> socket.socket: + try: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.setblocking(False) + + if not os.access(DIR_XALARM, os.F_OK): + os.makedirs(DIR_XALARM) + os.chmod(DIR_XALARM, ALARM_DIR_PERMISSION) + + sock.connect(PATH_REG_ALARM) + return sock + except (IOError, OSError, FileNotFoundError) as e: + sock.close() + sys.stderr.write(f"create_unix_socket: create socket error:{e}\n") + return None + + def alarm_recv(self): + while not self.thread_should_stop: + try: + data = self.socket.recv(ALARM_REPORT_LEN) + if not data: + sys.stderr.write("connection closed by xalarmd.service, maybe connections reach max num or service stopped.\n") + self.thread_should_stop = True + break + if len(data) != ALARM_REPORT_LEN: + sys.stderr.write(f"server receive report msg length wrong {len(data)}\n") + continue + + alarm_info = alarm_bin2stu(data) + self.put_alarm_info(alarm_info) + except (BlockingIOError) as e: + time.sleep(0.1) + except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError): + sys.stderr.write("connection closed by the server.\n") + self.thread_should_stop = True + except (ValueError, StructParseError, InterruptedError) as e: + sys.stderr.write(f"{e}\n") + except Exception as e: + sys.stderr.write(f"{e}\n") + self.thread_should_stop = True + + def start_thread(self) -> None: + self.thread.daemon = True + self.thread.start() + + def stop_thread(self) -> None: + self.thread_should_stop = True + self.thread.join() + + +def xalarm_register(callback: callable, id_filter: list[bool]) -> int: + global ALARM_REGISTER_INFO + logging.info('1234445566777888') + if ALARM_REGISTER_INFO is not None: + sys.stderr.write("xalarm_register: alarm has registered\n") + return -1 + + ALARM_REGISTER_INFO = AlarmRegister(id_filter, callback) + if not ALARM_REGISTER_INFO.check_params(): + return -1 + + ALARM_REGISTER_INFO.start_thread() + + return 0 + + +def xalarm_unregister(clientId: int) -> None: + global ALARM_REGISTER_INFO + if clientId < 0: + sys.stderr.write("xalarm_unregister: invalid client\n") + return + + if ALARM_REGISTER_INFO is None: + sys.stderr.write("xalarm_unregister: alarm has not registered\n") + return + + ALARM_REGISTER_INFO.stop_thread() + ALARM_REGISTER_INFO = None + + +def xalarm_upgrade(clientId: int, id_filter: list[bool]) -> None: + global ALARM_REGISTER_INFO + if clientId < 0: + sys.stderr.write("xalarm_unregister: invalid client\n") + return + if ALARM_REGISTER_INFO is None: + sys.stderr.write("xalarm_unregister: alarm has not registered\n") + return + ALARM_REGISTER_INFO.id_filter = id_filter + + +def xalarm_getid(alarm_info: Xalarm) -> int: + if not alarm_info: + return 0 + return alarm_info.alarm_id + + +def xalarm_getlevel(alarm_info: Xalarm) -> int: + if not alarm_info: + return 0 + return alarm_info.alarm_level + + +def xalarm_gettype(alarm_info: Xalarm) -> int: + if not alarm_info: + return 0 + return alarm_info.alarm_type + + +def xalarm_gettime(alarm_info: Xalarm) -> int: + if not alarm_info: + return 0 + return alarm_info.timetamp.tv_sec * TIME_UNIT_MILLISECONDS + alarm_info.timetamp.tv_usec / TIME_UNIT_MILLISECONDS + +def xalarm_getdesc(alarm_info: Xalarm) -> str: + if not alarm_info: + return None + return alarm_info.msg1 \ No newline at end of file diff --git a/sysSentry-1.0.2/src/python/xalarm/sentry_notify.py b/sysSentry-1.0.2/src/python/xalarm/sentry_notify.py new file mode 100644 index 0000000..1827f04 --- /dev/null +++ b/sysSentry-1.0.2/src/python/xalarm/sentry_notify.py @@ -0,0 +1,71 @@ +import os +import sys +import time +import socket +from struct import error as StructParseError + +from .xalarm_api import alarm_stu2bin, Xalarm + +MAX_NUM_OF_ALARM_ID = 128 +MIN_ALARM_ID = 1001 +MAX_ALARM_ID = (MIN_ALARM_ID + MAX_NUM_OF_ALARM_ID - 1) + +MINOR_ALM = 1 +MAJOR_ALM = 2 +CRITICAL_ALM = 3 + +ALARM_TYPE_OCCUR = 1 +ALARM_TYPE_RECOVER = 2 + +MAX_PUC_PARAS_LEN = 1024 + +DIR_XALARM = "/var/run/xalarm" +PATH_REPORT_ALARM = "/var/run/xalarm/report" +ALARM_DIR_PERMISSION = 0o750 +ALARM_SOCKET_PERMISSION = 0o700 + + +def check_params(alarm_id, alarm_level, alarm_type, puc_paras) -> bool: + if not os.path.exists(DIR_XALARM): + sys.stderr.write(f"check_params: {DIR_XALARM} not exist, failed") + return False + + if not os.path.exists(PATH_REPORT_ALARM): + sys.stderr.write(f"check_params: {PATH_REPORT_ALARM} not exist, failed") + return False + + if (alarm_id < MIN_ALARM_ID or alarm_id > MAX_ALARM_ID or + alarm_level < MINOR_ALM or alarm_level > CRITICAL_ALM or + alarm_type < ALARM_TYPE_OCCUR or alarm_type > ALARM_TYPE_RECOVER): + sys.stderr.write("check_params: alarm info invalid\n") + return False + + if len(puc_paras) >= MAX_PUC_PARAS_LEN: + sys.stderr.write(f"check_params: alarm msg should be less than {MAX_PUC_PARAS_LEN}\n") + return False + + return True + +def xalarm_report(alarm_id, alarm_level, alarm_type, puc_paras) -> bool: + if not check_params(alarm_id, alarm_level, alarm_type, puc_paras): + return False + + try: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + + # 时间戳不一致 + current_time = time.time() + current_time_seconds = int(current_time) + current_microseconds = int((current_time - current_time_seconds) * 1_000_000) + alarm_info = Xalarm(alarm_id, alarm_type, alarm_level, + current_time_seconds, current_microseconds, puc_paras) + print('sentry_to!!!!!!!!') + sock.sendto(alarm_stu2bin(alarm_info), PATH_REPORT_ALARM) + except (FileNotFoundError, StructParseError, socket.error, OSError, UnicodeError) as e: + sys.stderr.write(f"check_params: error occurs when sending msg.{e}\n") + return False + finally: + sock.close() + + return True + diff --git a/sysSentry-1.0.2/src/python/xalarm/xalarm_api.py b/sysSentry-1.0.2/src/python/xalarm/xalarm_api.py index a2cdb25..fb9dea6 100644 --- a/sysSentry-1.0.2/src/python/xalarm/xalarm_api.py +++ b/sysSentry-1.0.2/src/python/xalarm/xalarm_api.py @@ -23,6 +23,7 @@ ALARM_LEVELS = (1, 2, 3, 4, 5) ALARM_SOCK_PATH = "/var/run/xalarm/report" MIN_ALARM_ID = 1001 MAX_ALARM_ID = 1128 +MAX_MSG_LEN = 1024 @dataclasses.dataclass @@ -97,15 +98,15 @@ class Xalarm: def msg1(self, msg): """msg1 setter """ - if len(msg) > 512: - raise ValueError("msg1 length must below 512") + if len(msg) > MAX_MSG_LEN: + raise ValueError(f"msg1 length must below {MAX_MSG_LEN}") self._msg1 = msg def alarm_bin2stu(bin_data): """alarm binary to struct """ - struct_data = struct.unpack("@HBBll512s", bin_data) + struct_data = struct.unpack(f"@HBBll{MAX_MSG_LEN}s", bin_data) alarm_info = Xalarm(1001, 2, 1, 0, 0, "") alarm_info.alarm_id = struct_data[0] @@ -116,3 +117,15 @@ def alarm_bin2stu(bin_data): alarm_info.msg1 = struct_data[5] return alarm_info + + +def alarm_stu2bin(alarm_info: Xalarm): + return struct.pack( + f'@HBBll{MAX_MSG_LEN}s', + alarm_info.alarm_id, + alarm_info.alarm_level, + alarm_info.alarm_type, + alarm_info.timetamp.tv_sec, + alarm_info.timetamp.tv_usec, + alarm_info.msg1.encode('utf-8')) + \ No newline at end of file diff --git a/sysSentry-1.0.2/src/python/xalarm/xalarm_server.py b/sysSentry-1.0.2/src/python/xalarm/xalarm_server.py index 84db273..8e975c0 100644 --- a/sysSentry-1.0.2/src/python/xalarm/xalarm_server.py +++ b/sysSentry-1.0.2/src/python/xalarm/xalarm_server.py @@ -17,16 +17,20 @@ Create: 2023-11-02 import socket import os import logging +import select +import threading from struct import error as StructParseError from .xalarm_api import alarm_bin2stu -from .xalarm_transfer import check_filter, transmit_alarm +from .xalarm_transfer import check_filter, transmit_alarm, wait_for_connection ALARM_DIR = "/var/run/xalarm" +USER_RECV_SOCK = "/var/run/xalarm/alarm" SOCK_FILE = "/var/run/xalarm/report" -ALARM_REPORT_LEN = 536 +ALARM_REPORT_LEN = 1048 ALARM_DIR_PERMISSION = 0o750 +ALARM_LISTEN_QUEUE_LEN = 5 def clear_sock_path(): @@ -37,6 +41,8 @@ def clear_sock_path(): os.chmod(ALARM_DIR, ALARM_DIR_PERMISSION) if os.path.exists(SOCK_FILE): os.unlink(SOCK_FILE) + if os.path.exists(USER_RECV_SOCK): + os.unlink(USER_RECV_SOCK) def server_loop(alarm_config): @@ -49,6 +55,21 @@ def server_loop(alarm_config): sock.bind(SOCK_FILE) os.chmod(SOCK_FILE, 0o600) + alarm_sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + alarm_sock.bind(USER_RECV_SOCK) + os.chmod(USER_RECV_SOCK, 0o600) + alarm_sock.listen(ALARM_LISTEN_QUEUE_LEN) + alarm_sock.setblocking(False) + + epoll = select.epoll() + epoll.register(alarm_sock.fileno(), select.EPOLLIN) + fd_to_socket = {alarm_sock.fileno(): alarm_sock,} + thread_should_stop = False + + thread = threading.Thread(target=wait_for_connection, args=(alarm_sock, epoll, fd_to_socket, thread_should_stop)) + thread.daemon = True + thread.start() + while True: try: data, _ = sock.recvfrom(ALARM_REPORT_LEN) @@ -58,14 +79,20 @@ def server_loop(alarm_config): logging.debug("server receive report msg length wrong %d", len(data)) continue - alarm_info = alarm_bin2stu(data) - logging.debug("server bin2stu msg") + logging.info("server bin2stu msg") if not check_filter(alarm_info, alarm_config): continue + transmit_alarm(alarm_sock, epoll, fd_to_socket, data) + except Exception as e: + logging.error(f"Error server:{e}") + + thread_should_stop = True + thread.join() - transmit_alarm(data) - except (ValueError, StructParseError): - pass + epoll.unregister(alarm_sock.fileno()) + epoll.close() + alarm_sock.close() + os.unlink(USER_RECV_SOCK) sock.close() diff --git a/sysSentry-1.0.2/src/python/xalarm/xalarm_transfer.py b/sysSentry-1.0.2/src/python/xalarm/xalarm_transfer.py index b590b43..78521b7 100644 --- a/sysSentry-1.0.2/src/python/xalarm/xalarm_transfer.py +++ b/sysSentry-1.0.2/src/python/xalarm/xalarm_transfer.py @@ -16,10 +16,12 @@ Create: 2023-11-02 import socket import logging +import select -USER_RECV_SOCK = "/var/run/xalarm/alarm" MIN_ID_NUMBER = 1001 MAX_ID_NUMBER = 1128 +MAX_CONNECTION_NUM = 100 +TEST_CONNECT_BUFFER_SIZE = 32 def check_filter(alarm_info, alarm_filter): diff --git a/xalarm_test/xalarm_sen.py b/xalarm_test/xalarm_sen.py new file mode 100644 index 0000000..ed79757 --- /dev/null +++ b/xalarm_test/xalarm_sen.py @@ -0,0 +1,7 @@ +from xalarm.sentry_notify import xalarm_report + + +if __name__ == "__main__": + + alarm_info='''{"driver_name": "/dev/sda","reason": "disk_slow","block_stack": "bio","io_type": "read","alarm_source": "avg_block_io","details": "[data_elel]"}''' + xalarm_report(1001, 1, 2, alarm_info) \ No newline at end of file -- Gitee From 84a45b30a6ac403b361a36d70ee18966b31f7ed8 Mon Sep 17 00:00:00 2001 From: boluo1 <11274284+boluo56@user.noreply.gitee.com> Date: Fri, 27 Sep 2024 01:28:44 +0000 Subject: [PATCH 2/2] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=96=87=E4=BB=B6=20xala?= =?UTF-8?q?rm=5Ftest?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- xalarm_test/xalarm_sen.py | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 xalarm_test/xalarm_sen.py diff --git a/xalarm_test/xalarm_sen.py b/xalarm_test/xalarm_sen.py deleted file mode 100644 index ed79757..0000000 --- a/xalarm_test/xalarm_sen.py +++ /dev/null @@ -1,7 +0,0 @@ -from xalarm.sentry_notify import xalarm_report - - -if __name__ == "__main__": - - alarm_info='''{"driver_name": "/dev/sda","reason": "disk_slow","block_stack": "bio","io_type": "read","alarm_source": "avg_block_io","details": "[data_elel]"}''' - xalarm_report(1001, 1, 2, alarm_info) \ No newline at end of file -- Gitee