diff --git a/sysSentry-1.0.2/src/python/syssentry/callbacks.py b/sysSentry-1.0.2/src/python/syssentry/callbacks.py index a13c5272ad2e25ba854583312ee9ebccb5039d04..d0d0719bf7d79c770b0945783a35b9f8a74bbcc9 100644 --- a/sysSentry-1.0.2/src/python/syssentry/callbacks.py +++ b/sysSentry-1.0.2/src/python/syssentry/callbacks.py @@ -31,6 +31,17 @@ def task_get_status(mod_name): return "success", task.get_status() +def task_get_result(mod_name): + """get result by mod name""" + task = TasksMap.get_task_by_name(mod_name) + if not task: + return "failed", f"cannot find task by name {mod_name}" + if not task.load_enabled: + return "failed", f"mod {mod_name} is not enabled" + + return "success", task.get_result() + + def task_stop(mod_name): """stop by mod name""" ret = "success" diff --git a/sysSentry-1.0.2/src/python/syssentry/cron_process.py b/sysSentry-1.0.2/src/python/syssentry/cron_process.py index 23e26dc9efb09d689fca21f8d96aeb58d1a381b5..88e4cc648753cca9f817a47cc7dd453c6e91bd34 100644 --- a/sysSentry-1.0.2/src/python/syssentry/cron_process.py +++ b/sysSentry-1.0.2/src/python/syssentry/cron_process.py @@ -18,6 +18,8 @@ import time import logging import subprocess +from .utils import get_current_time_string +from .result import ResultLevel, RESULT_LEVEL_ERR_MSG_DICT from .global_values import InspectTask from .task_map import TasksMap, PERIOD_TYPE from .mod_status import set_runtime_status, WAITING_STATUS, RUNNING_STATUS, \ @@ -43,6 +45,15 @@ class PeriodTask(InspectTask): self.runtime_status = EXITED_STATUS def start(self): + """ + start function we use async mode + when we have called the start command, function return + """ + self.result_info["result"] = "" + self.result_info["start_time"] = get_current_time_string() + self.result_info["end_time"] = "" + self.result_info["error_msg"] = "" + self.result_info["details"] = {} if not self.period_enabled: self.period_enabled = True self.upgrade_period_timestamp() @@ -52,12 +63,16 @@ class PeriodTask(InspectTask): logfile = open(self.log_file, 'a') os.chmod(self.log_file, 0o600) except OSError: + self.result_info["result"] = ResultLevel.FAIL.name + self.result_info["error_msg"] = RESULT_LEVEL_ERR_MSG_DICT.get(ResultLevel.FAIL.name) logging.error("task %s log_file %s open failed", self.name, self.log_file) logfile = subprocess.PIPE try: child = subprocess.Popen(cmd_list, stdout=logfile, stderr=subprocess.STDOUT, close_fds=True) except OSError: + self.result_info["result"] = ResultLevel.FAIL.name + self.result_info["error_msg"] = RESULT_LEVEL_ERR_MSG_DICT.get(ResultLevel.FAIL.name) logging.error("period task %s start Popen failed", self.name) self.runtime_status = FAILED_STATUS return False, "period task start popen failed, invalid command" diff --git a/sysSentry-1.0.2/src/python/syssentry/global_values.py b/sysSentry-1.0.2/src/python/syssentry/global_values.py index e93591f7ca68bbc16801b651ee282db9d8331a9d..dbff46eb86cf6d84970066cb1a0670f44fef5169 100644 --- a/sysSentry-1.0.2/src/python/syssentry/global_values.py +++ b/sysSentry-1.0.2/src/python/syssentry/global_values.py @@ -18,6 +18,8 @@ import logging import time import os +from .result import ResultLevel, RESULT_LEVEL_ERR_MSG_DICT +from .utils import get_current_time_string SENTRY_RUN_DIR = "/var/run/sysSentry" CTL_SOCKET_PATH = "/var/run/sysSentry/control.sock" @@ -59,11 +61,25 @@ class InspectTask: self.period_enabled = True # load enabled self.load_enabled = True + # init result + self.result_info = { + "result": "", + "start_time": "", + "end_time": "", + "error_msg": "", + "details": {} + } - # start function we use async mode - # when we have called the start command, function return def start(self): - """start""" + """ + start function we use async mode + when we have called the start command, function return + """ + self.result_info["result"] = "" + self.result_info["start_time"] = get_current_time_string() + self.result_info["end_time"] = "" + self.result_info["error_msg"] = "" + self.result_info["details"] = {} if not self.period_enabled: self.period_enabled = True if self.runtime_status in ("EXITED", "FAILED"): @@ -72,12 +88,16 @@ class InspectTask: logfile = open(self.log_file, 'a') os.chmod(self.log_file, 0o600) except OSError: + self.result_info["result"] = ResultLevel.FAIL.name + self.result_info["error_msg"] = RESULT_LEVEL_ERR_MSG_DICT.get(ResultLevel.FAIL.name) logging.error("task %s log_file %s open failed", self.name, self.log_file) logfile = subprocess.PIPE try: child = subprocess.Popen(cmd_list, stdout=logfile, stderr=subprocess.STDOUT, close_fds=True) except OSError: logging.error("task %s start Popen error, invalid cmd") + self.result_info["result"] = ResultLevel.FAIL.name + self.result_info["error_msg"] = RESULT_LEVEL_ERR_MSG_DICT.get(ResultLevel.FAIL.name) self.runtime_status = "FAILED" return False, "start command is invalid, popen failed" finally: @@ -106,3 +126,7 @@ class InspectTask: def get_status(self): """get status""" return self.runtime_status + + def get_result(self): + """get result""" + return self.result_info diff --git a/sysSentry-1.0.2/src/python/syssentry/sentryctl b/sysSentry-1.0.2/src/python/syssentry/sentryctl index f6e636e42d51333530abe6a39d0448872c15c39a..e94491f110f12b692ed3d6461935e1f730063046 100644 --- a/sysSentry-1.0.2/src/python/syssentry/sentryctl +++ b/sysSentry-1.0.2/src/python/syssentry/sentryctl @@ -17,15 +17,25 @@ import socket import argparse import json import sys +import logging +import json CTL_SOCKET_PATH = "/var/run/sysSentry/control.sock" MAX_PARAM_LENGTH = 256 +RESULT_MSG_DATA_LEN = 4 +CTL_MSG_LEN_LEN = 3 def status_output_format(res_data): """format output""" print(f"status: {res_data}") +def result_output_format(res_data): + try: + print(json.dumps(res_data, indent=4)) + except json.decoder.JSONDecodeError: + logging.warning("result_output_format: result is \n%s\n, but json.dumps failed!") + print(res_data) def mod_list_output_format(res_data): """format output list""" @@ -45,6 +55,8 @@ def res_output_handle(res_struct, req_type): mod_list_output_format(res_struct['data']) elif req_type == 'get_status': status_output_format(res_struct['data']) + elif req_type == 'get_result': + result_output_format(res_struct['data']) elif res_struct['ret'] == "failed": print(res_struct['data']) @@ -55,7 +67,7 @@ def res_msg_serial(res_msg): return res_struct -def client_send_and_recv(request_data): +def client_send_and_recv(request_data, data_str_len): """client socket send and recv message""" try: client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) @@ -75,7 +87,7 @@ def client_send_and_recv(request_data): try: client_socket.send(request_msg.encode()) - res_data = client_socket.recv(6) + res_data = client_socket.recv(len("CTL") + data_str_len) res_data = res_data.decode() except (OSError, UnicodeError): client_socket.close() @@ -114,6 +126,8 @@ if __name__ == '__main__': parser_reload.add_argument('task_name') parser_status = subparsers.add_parser('status', help='get task status') parser_status.add_argument('task_name') + parser_get_result = subparsers.add_parser('get_result', help='get task result') + parser_get_result.add_argument('task_name') parser_list = subparsers.add_parser('list', help='show all loaded task mod') client_args = parser.parse_args() @@ -126,6 +140,8 @@ if __name__ == '__main__': req_msg_struct = {"type": "stop", "data": client_args.task_name} elif client_args.cmd_type == 'status': req_msg_struct = {"type": "get_status", "data": client_args.task_name} + elif client_args.cmd_type == 'get_result': + req_msg_struct = {"type": "get_result", "data": client_args.task_name} elif client_args.cmd_type == 'reload': req_msg_struct = {"type": "reload", "data": client_args.task_name} else: @@ -137,7 +153,10 @@ if __name__ == '__main__': sys.exit(-1) request_message = json.dumps(req_msg_struct) - result_message = client_send_and_recv(request_message) + if client_args.cmd_type == 'get_result': + result_message = client_send_and_recv(request_message, RESULT_MSG_DATA_LEN) + else: + result_message = client_send_and_recv(request_message, CTL_MSG_LEN_LEN) if not result_message: print("sentryctl: client_send_and_recv failed") sys.exit(-1) diff --git a/sysSentry-1.0.2/src/python/syssentry/syssentry.py b/sysSentry-1.0.2/src/python/syssentry/syssentry.py index 0e3f77370f7c5d6f4a7ca3617d90e73aeff4f867..40b56197c6efbd35b61b7364ae166a51423419cf 100644 --- a/sysSentry-1.0.2/src/python/syssentry/syssentry.py +++ b/sysSentry-1.0.2/src/python/syssentry/syssentry.py @@ -28,11 +28,14 @@ from .sentry_config import SentryConfig from .task_map import TasksMap from .global_values import SENTRY_RUN_DIR, CTL_SOCKET_PATH, SENTRY_RUN_DIR_PERM from .cron_process import period_tasks_handle -from .callbacks import mod_list_show, task_start, task_get_status, task_stop +from .callbacks import mod_list_show, task_start, task_get_status, task_stop, task_get_result from .mod_status import get_task_by_pid, set_runtime_status from .load_mods import load_tasks, reload_single_mod from .heartbeat import (heartbeat_timeout_chk, heartbeat_fd_create, heartbeat_recv, THB_SOCKET_PATH) +from .result import RESULT_MSG_HEAD_LEN, RESULT_MSG_MAGIC_LEN, RESULT_MAGIC +from .result import RESULT_LEVEL_ERR_MSG_DICT, ResultLevel +from .utils import get_current_time_string INSPECTOR = None @@ -50,6 +53,7 @@ type_func = { 'start': task_start, 'stop': task_stop, 'get_status': task_get_status, + 'get_result': task_get_result, 'reload': reload_single_mod } @@ -66,6 +70,9 @@ SYSSENTRY_LOG_FILE = "/var/log/sysSentry/sysSentry.log" SYSSENTRY_PID_FILE = "/var/run/sysSentry/sysSentry.pid" PID_FILE_FLOCK = None +# result-specific socket +RESULT_SOCKET_PATH = "/var/run/sysSentry/result.sock" + def msg_data_process(msg_data): """message data process""" @@ -101,6 +108,57 @@ def msg_data_process(msg_data): return res_msg +def process_and_update_task_result(result_msg_data: dict) -> bool: + ''' + result_msg_data format is as follows: + { + "task_name" : task_name, # (eg. npu_sentry), + "result_data" : { + "result": ResultLevel, + "details": {}, + }, + } + result is what 'sentryctl get-result task_name' needs to output + ''' + try: + data_struct = json.loads(result_msg_data) + except json.decoder.JSONDecodeError: + logging.error("result msg data process: json decode error") + return False + + if 'task_name' not in data_struct: + logging.error("recv message format error, task_name is not exist") + return False + + if 'result_data' not in data_struct: + logging.error("recv message format error, result is not exist") + return False + + task = TasksMap.get_task_by_name(data_struct['task_name']) + if not task: + logging.error("task '%s' do not exists!!", data_struct['task_name']) + return False + + result_data = data_struct.get("result_data", {}) + sentry_result = result_data.get("result") + sentry_detail = result_data.get("details") + if sentry_result not in ResultLevel.__members__: + logging.error("recv 'result' does not belong to ResultLevel members!") + return False + if not isinstance(sentry_detail, dict): + logging.debug("recv 'details' must be dict object, try to convert it to a dict obj") + try: + sentry_detail = json.loads(sentry_detail) + except (json.decoder.JSONDecodeError, TypeError): + logging.error("'details' is malformed, it should be a dictionary formatted string") + return False + task.result_info["result"] = sentry_result + task.result_info["details"] = sentry_detail + task.result_info["error_msg"] = RESULT_LEVEL_ERR_MSG_DICT.get(sentry_result, "") + + return True + + def msg_head_process(msg_head): """message head process""" ctl_magic = msg_head[:CTL_MSG_MAGIC_LEN] @@ -118,6 +176,22 @@ def msg_head_process(msg_head): return data_len +def result_msg_head_process(msg_head): + """result message head process""" + result_magic = msg_head[:RESULT_MSG_MAGIC_LEN] + if result_magic != RESULT_MAGIC: + logging.error("recv result msg head magic invalid: %s", result_magic) + return -1 + + data_len_str = msg_head[RESULT_MSG_MAGIC_LEN:RESULT_MSG_HEAD_LEN] + try: + data_len = int(data_len_str) + except ValueError: + logging.error("recv result msg data len is invalid %s", data_len_str) + return -1 + return data_len + + def server_recv(server_socket: socket.socket): """server receive""" try: @@ -154,8 +228,20 @@ def server_recv(server_socket: socket.socket): res_data = msg_data_process(msg_data_decode) logging.debug("res data %s", res_data) + cmd_type = "" + try: + msg_data_decode_dict = json.loads(msg_data_decode) + cmd_type = msg_data_decode_dict.get("type") + except json.JSONDecodeError: + logging.error("msg data process: msg_data_decode json decode error") + return + + res_head = RES_MAGIC - res_data_len = str(len(res_data)).zfill(CTL_MSG_MAGIC_LEN) + if cmd_type == "get_result": + res_data_len = str(len(res_data)).zfill(RESULT_MSG_HEAD_LEN - RESULT_MSG_MAGIC_LEN) + else: + res_data_len = str(len(res_data)).zfill(CTL_MSG_MAGIC_LEN) res_head += res_data_len logging.debug("res head %s", res_head) @@ -194,19 +280,98 @@ def server_fd_create(): return server_fd +def server_result_recv(server_socket: socket.socket): + """server result receive""" + try: + client_socket, _ = server_socket.accept() + logging.debug("server_fd listen ok") + except socket.error: + logging.error("server accept failed") + return + + try: + msg_head = client_socket.recv(RESULT_MSG_HEAD_LEN) + logging.debug("recv msg head: %s", msg_head.decode()) + data_len = result_msg_head_process(msg_head.decode()) + except (OSError, UnicodeError): + client_socket.close() + logging.error("server recv HEAD failed") + return + + logging.debug("msg data length: %d", data_len) + if data_len < 0: + client_socket.close() + logging.error("msg head parse failed") + return + + try: + msg_data = client_socket.recv(data_len) + msg_data_decode = msg_data.decode() + logging.info("server recv result data :\n%s\n", msg_data_decode) + except (OSError, UnicodeError): + client_socket.close() + logging.error("server recv MSG failed") + return + + # update result + process_plugins_result = "SUCCESS" + if not process_and_update_task_result(msg_data_decode): + process_plugins_result = "FAILED" + logging.error("process server recv MSG failed") + + # response to client + logging.info("result recv msg head : %s , response ...", process_plugins_result) + try: + client_socket.send(process_plugins_result.encode()) + except OSError: + logging.warning("server send reponse to plugins failed") + finally: + client_socket.close() + return + + +def server_result_fd_create(): + """create server result fd""" + if not os.path.exists(SENTRY_RUN_DIR): + logging.error("%s not exist, failed", SENTRY_RUN_DIR) + return None + try: + server_result_fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + server_result_fd.setblocking(False) + if os.path.exists(RESULT_SOCKET_PATH): + os.remove(RESULT_SOCKET_PATH) + + server_result_fd.bind(RESULT_SOCKET_PATH) + os.chmod(RESULT_SOCKET_PATH, 0o600) + server_result_fd.listen(CTL_LISTEN_QUEUE_LEN) + logging.debug("%s bind and listen", RESULT_SOCKET_PATH) + except socket.error: + logging.error("server result fd create failed") + server_result_fd = None + + return server_result_fd + + def main_loop(): """main loop""" server_fd = server_fd_create() if not server_fd: return + server_result_fd = server_result_fd_create() + if not server_result_fd: + server_fd.close() + return + heartbeat_fd = heartbeat_fd_create() if not heartbeat_fd: server_fd.close() + server_result_fd.close() return epoll_fd = select.epoll() epoll_fd.register(server_fd.fileno(), select.EPOLLIN) + epoll_fd.register(server_result_fd.fileno(), select.EPOLLIN) epoll_fd.register(heartbeat_fd.fileno(), select.EPOLLIN) logging.debug("start main loop") @@ -216,6 +381,8 @@ def main_loop(): for event_fd, _ in events_list: if event_fd == server_fd.fileno(): server_recv(server_fd) + elif event_fd == server_result_fd.fileno(): + server_result_recv(server_result_fd) elif event_fd == heartbeat_fd.fileno(): heartbeat_recv(heartbeat_fd) else: @@ -279,6 +446,7 @@ def sigchld_handler(signum, _f): set_runtime_status(task.name, "EXITED") else: set_runtime_status(task.name, "FAILED") + task.result_info["end_time"] = get_current_time_string() except: break