加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
execute.py 22.44 KB
一键复制 编辑 原始数据 按行查看 历史
liyujie 提交于 2023-10-25 20:14 . fix
# 该文件为测试套执行的主要流程
# encoding=utf-8
import os
import time
import platform
from constants import *
from json.decoder import JSONDecoder
from utils import command_execute, parse_json, Logger, now_date_time, CommandExecute
class ExecuteTestsHandler():
def __init__(self, cmd_exe: CommandExecute, config: JSONDecoder, coverage: bool, subsystem='ace_engine'):
self.config = config
self.code_path = self.config['personal']['code']['path']
self.subsystem = subsystem
self.sheild_dir = ""
self.total_suites_count = 0
self.total_tests_count = 0
self.total_failed_tests_count = 0
self.total_passed_tests_count = 0
self.total_unavailable_suite_count = 0
self.total_abnormal_suites_count = 0
self.unavailable_test_suite_result = []
self.abnormal_test_suite_result = []
self.failed_test_suite_result = []
self.cost_time = 0
self.start_time = ""
self.coverage = coverage
self.cmd_exe = cmd_exe
def _mkdir(self, path):
isExits = os.path.exists(path)
if not isExits:
os.makedirs(path)
Logger.info(f"path {path} make successfully")
else:
Logger.info(f"path {path} already exits")
def record_parse_log_result(self, parse_result, test_suite_name):
state = parse_result['state']
if state == 3:
# undefined
self.abnormal_test_suite_result.append(parse_result)
self.total_abnormal_suites_count = self.total_abnormal_suites_count + 1
Logger.error(
f"[{test_suite_name}] test suite is in an undefined state, please check the result promptly")
elif state == 2:
# failed
self.failed_test_suite_result.append(parse_result)
total_count = parse_result['total_count']
failed_count = parse_result['failed_count']
passed_count = parse_result['passed_count']
self.total_tests_count = self.total_tests_count + total_count
self.total_failed_tests_count = self.total_failed_tests_count + failed_count
self.total_passed_tests_count = self.total_passed_tests_count + passed_count
Logger.warning(
f"[{test_suite_name}] partial failure, total_count is {total_count} tests,\
failed_count is {failed_count} tests, passed_count is {passed_count} tests")
elif state == 1:
# passed
total_count = parse_result['total_count']
passed_count = parse_result['passed_count']
self.total_tests_count = self.total_tests_count + total_count
self.total_passed_tests_count = self.total_passed_tests_count + passed_count
Logger.info(
f"[{test_suite_name}] execute successfully,\
total_count is {total_count} tests, passed_count is {passed_count} tests")
elif state == 0:
# signal
self.unavailable_test_suite_result.append(parse_result)
self.total_unavailable_suite_count = self.total_unavailable_suite_count + 1
signal = parse_result['signal']
Logger.error(f"[{test_suite_name}] test suite is in {signal}")
def parse_log(self, lines, test_suite_name, print_log=False):
"""
解析log的核心逻辑 比较固定的解析方法
result:
- state: [0, 1, 2, 3]
- 0表示SIGNAL
- 1表示PASSED
- 2表示FAILED
- 3表示UNDEFINED
"""
if print_log:
for line in lines:
Logger.debug(line.strip('\n'))
# 直接先读取最后一行
last_line = lines[-1].strip('\n')
# 是否含有Mutex的标记
MutexSign = False
if "Mutex" in last_line:
last_line = lines[-2].strip('\n')
MutexSign = True
signals = ["Signal 4", "Signal 6", "Signal 7", "Signal 11"]
result = {
'suite_name': test_suite_name,
'state': 3,
'signal': "",
'signal_log': [],
'abnormal_log': "",
'total_count': 0,
'failed_count': 0,
'passed_count': 0,
'failed_log': {
'name': [],
'time': [],
'log': []
}
}
# 如果最后一行是Signal
t_line = last_line.strip()
if t_line in signals:
result['state'] = 0
result['signal'] = t_line
result['signal_log'] = lines
return result
# 如果最后一行是PASSED
if "[ PASSED ]" in last_line:
split_res = last_line.split(" ")
total_count = int(split_res[5])
result['state'] = 1
result['total_count'] = total_count
result['passed_count'] = total_count
return result
if "FAILED TEST SUITE" in last_line.strip():
result['state'] = 3
result['abnormal_log'] = lines
return result
# 如果最后一行是FAILED TESTS
if "FAILED TESTS" in last_line.strip() or "FAILED TEST" in last_line.strip():
lastIndex = -1
total_test_line = ""
for i in range(20):
if "====" in lines[lastIndex]:
total_test_line = lines[lastIndex]
break
lastIndex = lastIndex - 1
result['state'] = 2
# 获取到失败
split_res = last_line.split(" ")
failed_test_count = int(split_res[1])
# 获取到总用例数
total_res = total_test_line.split(" ")
total_count = int(total_res[1])
passed_count = total_count - failed_test_count
result['total_count'] = total_count
result['failed_count'] = failed_test_count
result['passed_count'] = passed_count
# 获取到失败用例名
t_count = failed_test_count
failed_tests_name = []
while t_count > 0:
if MutexSign:
split_res = lines[-3-t_count].strip('\n').split(' ')
else:
split_res = lines[-2-t_count].strip('\n').split(' ')
failed_tests_name.append(split_res[5])
t_count = t_count - 1
for failed_test_name in failed_tests_name:
# 遍历此时的lines,第一次找到failed_test_name后停止并记录此时的坐标idx
t_name = "[ RUN ] " + failed_test_name
sIdx = 0
eIdx = 0
for idx in range(len(lines)):
line = lines[idx].strip('\n')
if t_name == line:
sIdx = idx + 1
# 从sIdx开始向后找直到遇到failed_test_name
while failed_test_name not in lines[idx+1].strip('\n'):
idx = idx + 1
eIdx = idx + 1
break
# 这里比较复杂 可以理解为先从lines的eIdx获取到["(2" --解析--> 2]
time = int((lines[eIdx].split(' ')[-2])[1:])
result['failed_log']['name'].append(failed_test_name)
result['failed_log']['time'].append(time)
result['failed_log']['log'].append(lines[sIdx:eIdx])
return result
if result['state'] == 3:
result['abnormal_log'] = lines
return result
def execute_single_test(self, test_suite_name):
command_execute(HDC_CLOCK)
command_execute(HDC_DELETE_FAULTLOG)
command_execute(HDC_DELETE_TEMP)
command_execute(HDC_DELETE_DEBUG)
command_execute(HDC_DELETE_FAULTLOGGER)
command_execute(HDC_HILOG_W_STOP)
command_execute(HDC_HILOG_CLEAR)
command_execute(HDC_DELETE_TEST)
command_execute(HDC_CREATE_TEST)
command_execute(
f'cd testsuites && {HDC_FILE_SEND} {test_suite_name} /data/test')
gcov_prefiex_length = len(self.code_path.split('/')) + 1
gcov_cmd = f"hdc shell \"cd /data/test && rm -f {test_suite_name}.xml && chmod +x * && GCOV_PREFIX=. GCOV_PREFIX_STRIP={gcov_prefiex_length} ./{test_suite_name}\""
result = command_execute(gcov_cmd)
for line in result:
Logger.debug(line.strip('\n'))
parse_result = self.parse_log(result, test_suite_name)
self.record_parse_log_result(parse_result, test_suite_name)
result = command_execute(HDC_FIND_OBJ)
if len(result) != 0 and self.coverage:
result = result[0].strip('\n')
command_execute(f"hdc shell tar -zcf /data/test/obj.tar.gz -C {result} .")
command_execute(HDC_FILE_RECV_OBJ)
self.cmd_exe.sftp_put("temp/obj.tar.gz", f"{self.code_path}/out/rk3568/")
try:
# 调用put方法,可能会引发OSError异常
self.cmd_exe.sftp_put("temp/obj.tar.gz", f"{self.code_path}/out/rk3568/")
except OSError as e:
# 在这里处理OSError异常
Logger.error(f"An OSError occurred: {e}")
self.cmd_exe.execute_no_log(f"cd {self.code_path}/out/rk3568 && tar -zxmf obj.tar.gz")
else:
Logger.error(f"{test_suite_name} coverage file gcda not generated")
def execute_all_tests(self):
self.start_time = now_date_time()
start = time.perf_counter()
for _, _, files in os.walk('testsuites'):
for file_name in files:
self.total_suites_count = self.total_suites_count + 1
self.execute_single_test(file_name)
end = time.perf_counter()
self.cost_time = end - start
def generate_sheild_dir(self):
shield_list = self.config['subsystem']['sheild_paths']
for shield in shield_list:
self.sheild_dir = self.sheild_dir + " " + shield
Logger.info(f"Blocking rules ==> {shield}")
def gernerate_coverage_html(self):
llvm_gcov_path = self.config['personal']['code']['gcov']
coverage_path = "coverage"
self._mkdir(coverage_path)
ace_engine_path = f"{self.code_path}/out/rk3568/obj/foundation/arkui/ace_engine"
cd_path = f"cd {ace_engine_path}"
lcov_cmd1 = f"lcov -d . -o cov_ace_engine.info -c --gcov-tool {llvm_gcov_path} --rc lcov_branch_coverage=1"
lcov_cmd2 = f"lcov -r cov_ace_engine.info {self.sheild_dir} -o cov_ace_engine.info --rc lcov_branch_coverage=1"
gen_html = f"genhtml -o {coverage_path} --ignore-errors source cov_ace_engine.info --branch-coverage"
self.cmd_exe.execute_no_log(f"{cd_path} && {lcov_cmd1}")
Logger.info("LCOV generate cov_ace_engine_info successfully")
self.cmd_exe.execute_no_log(f"{cd_path} && {lcov_cmd2}")
Logger.info("LCOV remove extra file successfully")
self.cmd_exe.execute_no_log(f"{cd_path} && {gen_html}")
Logger.info("LCOV generate html successfully")
def handle_log(self, log):
result = ""
for line in log:
line = line.replace(" ", "&nbsp;").strip('\n') + "<br>"
result = result + line
return result
def generate_html(self, exec_info, abnormal, unavailable, failed):
system_name = platform.system()
start_time = exec_info['start_time']
total_suites_count = exec_info['total_suites_count']
total_unavailable_suite_count = exec_info['total_unavailable_suite_count']
total_abnormal_suites_count = exec_info['total_abnormal_suites_count']
total_run_suites_count = total_suites_count - \
(total_unavailable_suite_count + total_abnormal_suites_count)
total_tests_count = exec_info['total_tests_count']
total_passed_tests_count = exec_info['total_passed_tests_count']
total_failed_tests_count = exec_info['total_failed_tests_count']
cost_time = round(exec_info['cost_time'], 0)
# 写html的头部
self._mkdir('reports')
failure_html = open("reports/failures.html", 'w')
failure_html.write("<html version=\"1.0\" lang=\"en\">\n")
failure_html.write("<head>\n")
failure_html.write(
"<meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\"/>\n")
failure_html.write("<title>Failures Report</title>\n")
failure_html.write(
"<link href=\"../css/failures.css\" rel=\"stylesheet\" type=\"text/css\">\n")
failure_html.write("</head>\n")
failure_html.write("<body>\n")
failure_html.write("<div class=\"container\">\n")
# 写exec-info
failure_html.write("<table class=\"exec-info\" id=\"summary\">\n")
failure_html.write("<tr>\n")
failure_html.write("<th colspan=\"4\">Test Summary</th>\n")
failure_html.write("</tr>\n")
failure_html.write("<tr>\n")
failure_html.write("<td class=\"normal first\">Platform:</td>\n")
failure_html.write("<td class=\"normal second\">RK3568</td>\n")
failure_html.write("<td class=\"normal third\">Test Type:</td>\n")
failure_html.write("<td class=\"normal fourth\">unittest</td>\n")
failure_html.write("</tr>\n")
failure_html.write("<tr>\n")
failure_html.write("<td class=\"normal first\">Device Name:</td>\n")
failure_html.write(
"<td class=\"normal second\">7001005458************5a385e3900</td>\n")
failure_html.write("<td class=\"normal third\">Host Info:</td>\n")
failure_html.write(f"<td class=\"normal fourth\">{system_name}</td>\n")
failure_html.write("</tr>\n")
failure_html.write("<tr>\n")
failure_html.write(
"<td class=\"normal first\">Test Start Time:</td>\n")
failure_html.write(f"<td class=\"normal second\">{start_time}</td>\n")
failure_html.write("<td class=\"normal third\">Execution Time:</td>\n")
failure_html.write(
f"<td class=\"normal fourth\">{cost_time}&nbsp;second</td>\n")
failure_html.write("</tr>\n")
failure_html.write("</table>\n")
# 写测试套的所有信息
failure_html.write("<table class=\"summary\">\n")
failure_html.write("<tr>\n")
failure_html.write(
f"<th class=\"normal modules color-normal\">{total_suites_count}</th>\n")
failure_html.write(
f"<th class=\"normal run-modules color-passed\">{total_run_suites_count}</th>\n")
failure_html.write(
f"<th class=\"normal total-tests color-normal\">{total_tests_count}</th>\n")
failure_html.write(
f"<th class=\"normal passed color-passed\">{total_passed_tests_count}</th>\n")
failure_html.write(
f"<th class=\"normal failed color-failed\">{total_failed_tests_count}</th>\n")
failure_html.write(
f"<th class=\"normal abnormal color-abnormal\">{total_abnormal_suites_count}</th>\n")
failure_html.write(
f"<th class=\"normal unavailable color-unavailable\">{total_unavailable_suite_count}</th>\n")
failure_html.write("</tr>\n")
failure_html.write("<tr>\n")
failure_html.write("<td class=\"normal modules\">Modules</td>\n")
failure_html.write(
"<td class=\"normal run-modules\">Run Modules</td>\n")
failure_html.write(
"<td class=\"normal total-tests\">Total Tests</td>\n")
failure_html.write("<td class=\"normal passed\">Passed</td>\n")
failure_html.write("<td class=\"normal failed\">Failed</td>\n")
failure_html.write("<td class=\"normal abnormal\">Abnormal</td>\n")
failure_html.write(
"<td class=\"normal unavailable\">Unavailable</td>\n")
failure_html.write("</tr>\n")
failure_html.write("</table>\n")
# 写异常测试套
for result in abnormal:
test_suite_name = result['suite_name']
# 查表找owner
failure_html.write("<table class='failure-test'>\n")
failure_html.write("<tr>\n")
failure_html.write("<th class='title' colspan='4'>\n")
failure_html.write(
f"<span class='title'>{test_suite_name}&nbsp;&nbsp;&nbsp;</span>\n")
failure_html.write("</th>\n")
failure_html.write("</tr>\n")
failure_html.write("<tr>\n")
failure_html.write("<th class='normal test'>Test</th>\n")
failure_html.write(
"<th class='normal status'><div class='circle-normal circle-white'></div></th>\n")
failure_html.write("<th class='normal result'>Result</th>\n")
failure_html.write("<th class='normal details'>Details</th>\n")
failure_html.write("</tr>\n")
failure_html.write("<tr>\n")
failure_html.write(
f"<td class='normal test'>{test_suite_name}</td>\n")
failure_html.write(
"<td class='normal status'><div class='circle-normal circle-unavailable'></div></td>\n")
failure_html.write("<td class='normal result'>abnormal</td>\n")
failure_html.write(
"<td class='normal details'>The test suite is in an abnormal state, \
possibly due to an issue with the log output. Please check it as soon as possible!</td>\n")
failure_html.write("</tr>\n")
failure_html.write("</table>\n")
# 写崩溃测试套
for result in unavailable:
test_suite_name = result['suite_name']
signal_log = self.handle_log(result['signal_log'])
# 查表找owner
failure_html.write("<table class='failure-test'>\n")
failure_html.write("<tr>\n")
failure_html.write("<th class='title' colspan='4'>\n")
failure_html.write(
f"<span class='title'>{test_suite_name}&nbsp;&nbsp;&nbsp;</span>\n")
failure_html.write("</th>\n")
failure_html.write("</tr>\n")
failure_html.write("<tr>\n")
failure_html.write("<th class='normal test'>Test</th>\n")
failure_html.write(
"<th class='normal status'><div class='circle-normal circle-white'></div></th>\n")
failure_html.write("<th class='normal result'>Result</th>\n")
failure_html.write("<th class='normal details'>Details</th>\n")
failure_html.write("</tr>\n")
failure_html.write("<tr>\n")
failure_html.write(
f"<td class='normal test'>{test_suite_name}</td>\n")
failure_html.write(
"<td class='normal status'><div class='circle-normal circle-unavailable'></div></td>\n")
failure_html.write("<td class='normal result'>unavailable</td>\n")
failure_html.write(
f"<td class='normal details'>{signal_log}</td>\n")
failure_html.write("</tr>\n")
failure_html.write("</table>\n")
# 写失败测试套
for result in failed:
test_suite_name = result['suite_name']
# 写测试套的头
failure_html.write("<table class='failure-test'>\n")
failure_html.write("<tr>\n")
failure_html.write("<th class='title' colspan='4'>\n")
failure_html.write(
f"<span class='title'>{test_suite_name}&nbsp;&nbsp;&nbsp;</span>\n")
failure_html.write("</th>\n")
failure_html.write("</tr>\n")
failure_html.write("<tr>\n")
failure_html.write("<th class='normal test'>Test</th>\n")
failure_html.write(
"<th class='normal status'><div class='circle-normal circle-white'></div></th>\n")
failure_html.write("<th class='normal result'>Result</th>\n")
failure_html.write("<th class='normal details'>Details</th>\n")
failure_html.write("</tr>\n")
# 写所有测试用例失败的信息
failed_log = result['failed_log']
test_name = failed_log['name']
test_log = failed_log['log']
for idx in range(len(test_name)):
log = self.handle_log(test_log[idx])
name = test_name[idx]
failure_html.write("<tr>\n")
failure_html.write(f"<td class='normal test'>{name}</td>\n")
failure_html.write(
"<td class='normal status'><div class='circle-normal circle-failed'></div></td>\n")
failure_html.write("<td class='normal result'>failed</td>\n")
failure_html.write(f"<td class='normal details'>{log}</td>\n")
failure_html.write("</tr>\n")
# 写测试套的尾
failure_html.write("</table>\n")
# 写html的结尾
failure_html.write("</div>\n")
failure_html.write("</body>\n")
failure_html.write("</html>\n")
failure_html.close()
Logger.info("Generate failures html for execution results")
def gernerate_suite_execute_html(self):
"""
产生用例执行报告
"""
Logger.info(f"Total number of test suites: {self.total_suites_count}")
Logger.info(
f"Total number of unavailable test suites: {self.total_unavailable_suite_count}")
Logger.info(
f"Total number of abnormal test suites: {self.total_abnormal_suites_count}")
Logger.info(f"Total number of test cases: {self.total_tests_count}")
Logger.info(
f"Total number of test cases failed: {self.total_failed_tests_count}")
Logger.info(
f"Total test cases execute cost time: {self.cost_time} seconds")
exec_info = {}
exec_info['total_suites_count'] = self.total_suites_count
exec_info['total_unavailable_suite_count'] = self.total_unavailable_suite_count
exec_info['total_abnormal_suites_count'] = self.total_abnormal_suites_count
exec_info['total_tests_count'] = self.total_tests_count
exec_info['total_passed_tests_count'] = self.total_passed_tests_count
exec_info['total_failed_tests_count'] = self.total_failed_tests_count
exec_info['cost_time'] = self.cost_time
exec_info['start_time'] = self.start_time
self.generate_html(exec_info, self.abnormal_test_suite_result,
self.unavailable_test_suite_result, self.failed_test_suite_result)
def process(self):
self.execute_all_tests()
if self.coverage:
self.generate_sheild_dir()
self.gernerate_coverage_html()
self.gernerate_suite_execute_html()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化