代码拉取完成,页面将自动刷新
#! /usr/bin/env python3
# -*- coding: UTF-8 -*-
# Author: Bary Chen
import ctypes.wintypes
import json
import os
import sys
from copy import deepcopy
# import hashlib
import PIL.Image
import PIL.ImageEnhance
import PIL.ImageQt
import cv2
import grpc
import win32api
import win32con
import win32gui
from PyQt5 import QtCore, QtGui, QtWidgets
from optometry import OptometryModification
from school_workflow import StudyStatus
from tools import location_parse, qt, tasks, window_operation, engineering_staff, usb_detector
# from tools.encryption import encrypt
from tools.env_paras import LOG_FOLDER, DATABASE, USER_CONFIG_FOLDER, STUDY_VERSION, EMPTY_OPTOMETRY_DATA
from tools.logger import _logger, shutdown_log
from tools.network import import_trial_code
from ui_grpc import Orchestrator_pb2, ShowFailedProcess_pb2_grpc, ShowFailedProcess_pb2
from widgets.app_UI import Ui_MainWindow
from widgets.canvas import Canvas
from widgets.cloud_address import InputAddress
from widgets.label import IndicatorBox
from widgets.miscellaneous import DateTimeLabel, StatusLabel
from widgets.notice import Notice
from widgets.tool_bar import ToolBar
from engineering_code import ImportTrialCodeWidget
class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow):
def __init__(self, *args):
super(MainWindow, self).__init__()
self.setupUi(self)
self.setWindowIcon(QtGui.QIcon('icons/rgb.ico'))
self.dockWidget.setTitleBarWidget(QtWidgets.QWidget(flags=QtCore.Qt.Widget)) # 将dock的标题栏替换为一个空的widget
self.setContextMenuPolicy(QtCore.Qt.NoContextMenu) # 禁用右键菜单
if len(args) > 1 and args[1] == '--full':
# self.setWindowFlags(QtCore.Qt.FramelessWindowHint)
self.full_screen_flag = True
else:
self.full_screen_flag = False
if QtCore.QLocale.system().name() == 'zh_CN':
self.is_chinese_system = True
else:
self.is_chinese_system = False
self.config_file = os.path.join(tasks.here, 'config/config.json')
self.user_school_config_file = os.path.join(USER_CONFIG_FOLDER, 'school_config.json')
if os.path.exists(self.config_file):
with open(self.config_file, 'r') as f:
self.config = json.load(f)
self.original_config = deepcopy(self.config)
else:
QtWidgets.QMessageBox.critical(self, self.tr('Error'), self.tr('No config file exists!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
sys.exit(1)
if os.path.exists(self.user_school_config_file):
config_file = self.user_school_config_file
elif os.path.exists(p := os.path.join(tasks.here,'config/school_config.json')):
config_file = p
else:
config_file = ''
if os.path.exists(config_file):
with open(config_file, 'r') as f:
self.school_config = json.load(f)
self.original_school_config = deepcopy(self.school_config)
else:
QtWidgets.QMessageBox.critical(self, self.tr('Error'), self.tr('No school mode config file exists!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
sys.exit(1)
if os.path.exists(p := os.path.join(tasks.here, 'config/ChineseNationData.json')):
with open(p, 'r') as f:
nation_location_data = json.load(f)
self.nation_location = location_parse.ProvinceCityCounty(nation_location_data)
else:
QtWidgets.QMessageBox.critical(self, self.tr('Error'), self.tr('No nation location data file exists!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
sys.exit(1)
if os.path.exists(p := os.path.join(tasks.here, 'config/third_part.json')):
with open(p, 'r') as f:
self.third_config = json.load(f)
self.original_third_config = deepcopy(self.third_config)
if os.path.exists(p := os.path.join(tasks.here, 'config/custom.json')):
with open(p, 'r', encoding='utf-8') as f:
self.custom_config = json.load(f)
self.label_25.setText(self.tr('Astigmatism Axial(%s)') % '°') # 多语言翻译问题
self.switch_language()
self.setWindowTitle(self.tr('MSI Study') + ' ' + STUDY_VERSION)
self.logger = _logger(self.config['LogLevel'], 'StudyLog',
os.path.join(LOG_FOLDER, 'MSI Study.log'), self.config['LogKeepCount'])
self.logger.info('-' * 30 + 'Start App' + '-' * 30)
# self.logger.info(f'Received App arguments: {args}')
# ---------------- widgets ----------------
self.pushButton_Camera.hide()
self.pushButton_Attention.hide()
# 隐藏暂时不用的功能,OCR功能暂时屏蔽
self.actionOpen_Camera.setVisible(False)
self.actionGrab.setVisible(False)
self.actionConnect_Instrument.setVisible(False)
# self.scene_menu = QtWidgets.QMenu(self.tr('Scene')) # 新建一菜单,摆放scene的action
# self.menu_Control.insertMenu(self.actionOpen_Camera, self.scene_menu)
# qt.addActions(self.scene_menu, [self.actionNew, self.actionSave, self.actionSet_Default])
# 新云端,去除更新学生数据功能、云端配置
self.actionUpdate_Students_Optometry.setVisible(False)
self.actionConfig_Cloud_Address.setVisible(False)
# 角膜塑形镜隐藏
self.label_29.hide()
self.comboBox_Orthokeratology_Diopter_OD.hide()
self.comboBox_Orthokeratology_Diopter_OS.hide()
del self.label_View
self.label_View = Canvas(self)
self.label_View.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
self.table_view.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectRows)
self.table_view.setSelectionMode(QtWidgets.QAbstractItemView.SingleSelection)
self.table_view.setEditTriggers(QtWidgets.QAbstractItemView.NoEditTriggers)
self.table_view.verticalHeader().setVisible(False)
self.table_view.horizontalHeader().setVisible(True)
self.table_view.horizontalHeader().setSectionResizeMode(QtWidgets.QHeaderView.Stretch)
self.central_layout = QtWidgets.QStackedLayout()
self.central_layout.addWidget(self.table_view)
self.central_layout.addWidget(self.label_View)
w = QtWidgets.QWidget(flags=QtCore.Qt.Widget)
w.setLayout(self.central_layout)
self.setCentralWidget(w)
# 增加校园筛查数据来源选择
self.school_query_source_menu = QtWidgets.QMenu(self.tr('Query Source'), self)
self.school_query_source_menu.setIcon(qt.newIcon('cloud'))
self.action_enable_thondar = qt.newAction(self, self.tr('Thondar Cloud'), checkable=True,
checked=self.config['SchoolMode']['UseThondarCloud'])
self.action_enable_codeisland = qt.newAction(self, self.tr('CodeIsLand Cloud'), checkable=True,
checked=self.config['SchoolMode']['UseCodeIsLand'])
self.action_group = QtWidgets.QActionGroup(self)
self.action_group.addAction(self.action_enable_thondar)
self.action_group.addAction(self.action_enable_codeisland)
self.action_group.triggered.connect(self.change_student_query_cloud)
self.school_query_source_menu.addActions((self.action_enable_thondar, self.action_enable_codeisland))
self.school_query_source_menu.setEnabled(self.config['SchoolMode']['Activated'])
self.menuViewMode.addMenu(self.school_query_source_menu)
# 校筛模式下增加第三方app控制的弹出菜单
self.third_app_menu = QtWidgets.QMenu(self.tr('Third App Input'), self)
self.third_app_menu.setIcon(qt.newIcon('app'))
self.action_enable_eye_nurse = qt.newAction(self, self.tr('Enable Eye Nurse Input'),
slot=self.eye_nurse_service,
checkable=True,
checked=self.third_config['EyeNurse']['Activated'])
self.third_app_menu.addAction(self.action_enable_eye_nurse)
self.third_app_menu.setEnabled(self.config['SchoolMode']['Activated'])
self.menuViewMode.addMenu(self.third_app_menu)
# 第三方筛查设备的启用与否的控制菜单
self.third_instrument_menu = QtWidgets.QMenu(self.tr('Third Instruments'), self)
self.third_instrument_menu.setIcon(qt.newIcon('camera'))
self.action_enable_vat300 = qt.newAction(self, self.tr('Enable VAT300'),
slot=self.change_third_instruments_states,
checkable=True,
checked=self.config['SchoolMode']['VisualChartRequired'])
self.action_enable_fr8900 = qt.newAction(self, self.tr('Enable FR8900'),
slot=self.change_third_instruments_states,
checkable=True,
checked=self.config['SchoolMode']['RefractometerRequired'])
self.third_instrument_menu.addActions((self.action_enable_vat300, self.action_enable_fr8900))
self.third_instrument_menu.setEnabled(self.config['SchoolMode']['Activated'])
self.menuViewMode.addMenu(self.third_instrument_menu)
# 定制菜单
self.action_enable_tianjin_eye_hospital = qt.newAction(self, self.tr('Enable Scanning Input'),
slot=self.change_custom_status,
checkable=True)
self.action_tianjin_eye_hospital_confirm_automatically = qt.newAction(self, self.tr('Confirm Automatically'),
slot=self.change_custom_status,
checkable=True)
# self.custom_group = QtWidgets.QActionGroup(self)
self.tianjin_eye_hospital_menu = QtWidgets.QMenu(self.tr('Tianjin Eye Hospital Scanning Input'), self)
self.tianjin_eye_hospital_menu.addAction(self.action_enable_tianjin_eye_hospital)
self.tianjin_eye_hospital_menu.addAction(self.action_tianjin_eye_hospital_confirm_automatically)
self.menuCustom.addMenu(self.tianjin_eye_hospital_menu)
# 右键菜单
self.menu = QtWidgets.QMenu()
new_id = qt.newAction(self, self.tr('ID'),
slot=lambda: self.add_box(self.tr('ID')),
icon='new')
new_first_name = qt.newAction(self, self.tr('FirstName'),
slot=lambda: self.add_box(self.tr('FirstName')),
icon='new')
new_middle_name = qt.newAction(self, self.tr('MiddleName'),
slot=lambda: self.add_box(self.tr('MiddleName')),
icon='new')
new_last_name = qt.newAction(self, self.tr('LastName'),
slot=lambda: self.add_box(self.tr('LastName')),
icon='new')
new_birth_date = qt.newAction(self, self.tr('BirthDate'),
slot=lambda: self.add_box(self.tr('BirthDate')),
icon='new')
# new_age = qt.newAction(self, self.tr('Age'),
# slot=lambda: self.add_box(self.tr('Age')),
# icon='new')
new_gender = qt.newAction(self, self.tr('Gender'),
slot=lambda: self.add_box(self.tr('Gender')),
icon='new')
# new_address = qt.newAction(self, self.tr('Address'),
# slot=lambda: self.add_box(self.tr('Address')),
# icon='new')
# new_department = qt.newAction(self, self.tr('Department'),
# slot=lambda: self.add_box(self.tr('Department')),
# icon='new')
if QtCore.QLocale.system().name() == 'zh_CN':
# self.menu.addActions([new_id, new_last_name, new_birth_date, new_age,
# new_gender, new_address, new_department])
self.menu.addActions([new_id, new_last_name, new_birth_date, new_gender])
else:
# self.menu.addActions([new_id, new_first_name, new_middle_name, new_last_name,
# new_birth_date, new_age, new_gender, new_address, new_department])
self.menu.addActions([new_id, new_first_name, new_middle_name, new_last_name,
new_birth_date, new_gender])
# self.label_notice = QtWidgets.QLabel(self.label_View)
self.lineEdit_ID.setFocus()
# self.lineEdit_Age.setText('0')
self.dateEdit.setDate(QtCore.QDate.currentDate())
self.lineEdit_ID.setValidator(qt.idValidator())
self.lineEdit_Age.setValidator(qt.ageValidator())
self.lineEdit_First_Name.setValidator(qt.nameValidator())
self.lineEdit_Middle_Name.setValidator(qt.nameValidator())
self.lineEdit_Last_Name.setValidator(qt.nameValidator())
self.lineEdit_Pupil_Distance.setValidator(qt.pupilDistanceValidator())
available_boxes = [self.tr('ID'), self.tr('FirstName'), self.tr('MiddleName'), self.tr('LastName'),
self.tr('BirthDate'), self.tr('Age'), self.tr('Gender'), self.tr('Address'),
self.tr('Department')]
self.comboBox_Box.addItems(available_boxes)
self.comboBox_Date_Format.addItems(self.config['AgeFormat']['DateFormat'])
if self.config['AgeFormat']['Default'] in self.config['AgeFormat']['DateFormat']:
self.comboBox_Date_Format.setCurrentText(self.config['AgeFormat']['Default'])
if not self.config['EngineerMode']:
self.dockWidget_EngineerMode.hide()
# 隐藏任务数量
if not self.config['Instrument']['ShowTaskNumber']:
self.label_15.hide()
self.label_Tasks_Num.hide()
if os.path.exists('scenes'):
configs = []
for scene in os.listdir('scenes'):
configs.append(scene.rstrip('.json'))
self.comboBox_Scene.addItems(configs)
if self.config['Scene'] in configs:
self.comboBox_Scene.setCurrentText(self.config['Scene'])
self.actionOpen_Camera.setIcon(qt.newIcon('camera'))
self.actionNew.setIcon(qt.newIcon('new'))
self.actionSave.setIcon(qt.newIcon('save'))
self.actionGrab.setIcon(qt.newIcon('grab'))
self.actionConfirm.setIcon(qt.newIcon('confirm'))
self.actionClose.setIcon(qt.newIcon('close'))
self.actionQuery_Unchecked_Students.setIcon(qt.newIcon('SearchOnCloud'))
self.actionUpdate_Students_Optometry.setIcon(qt.newIcon('glasses'))
self.actionActivate_School_Mode.setIcon(qt.newIcon('school'))
self.actionConfig_Cloud_Address.setIcon(qt.newIcon('config'))
self.actionConnect_Instrument.setIcon(qt.newIcon('connect'))
self.actionSet_Default.setIcon(qt.newIcon('default'))
self.actionReconnect.setIcon(qt.newIcon('restart'))
self.actionFailure_Report.setIcon(qt.newIcon('export'))
self.actionRestart_Instrument_Quickly.setIcon(qt.newIcon('restart'))
self.actionInstrument_Information.setIcon(qt.newIcon('system'))
self.actionSynchronize_Time.setIcon(qt.newIcon('clock'))
self.actionData_Manager.setIcon(qt.newIcon('data_manager_icon'))
self.actionWindows_Mode.setIcon(qt.newIcon('window_setting'))
self.actionSeal_Windows.setIcon(qt.newIcon('lock'))
self.actionTrial_Code.setIcon(qt.newIcon('lock'))
self.actionUpdateSoftware.setIcon(qt.newIcon('update'))
self.actionAuto_Next.setChecked(self.config['Instrument']['AutoNext'])
self.actionAuto_ID.setChecked(self.config['Instrument']['AutoID'])
if self.is_chinese_system:
self.actionActivate_School_Mode.setChecked(self.config['SchoolMode']['Activated'])
else:
self.actionActivate_School_Mode.setChecked(False) # 非中文系统,校筛模式禁用
self.actionQuery_Unchecked_Students.setEnabled(self.config['SchoolMode']['Activated'])
self.actionUpdate_Students_Optometry.setEnabled(self.config['SchoolMode']['Activated'])
self.actionConfig_Cloud_Address.setEnabled(self.config['SchoolMode']['Activated'])
# MSI report 调用action
self.report_starter_timer = QtCore.QTimer() # 防双击导致多次打开
self.report_starter_timer.setInterval(600)
self.report_starter_timer.timeout.connect(self.msi_report)
self.action_msi_report = qt.newAction(self, self.tr('MSI Report'), slot=self.report_starter_timer.start,
icon='msi_report')
# 关机、重启action
self.action_shutdown_windows = qt.newAction(self, self.tr('Shutdown PC'),
slot=lambda: self.windows_shutdown_or_restart(shutdown=True),
icon='close')
self.action_restart_windows = qt.newAction(self, self.tr('Restart PC'),
slot=lambda: self.windows_shutdown_or_restart(restart=True),
icon='restart')
# 下单系统action
self.glass_order_timer = QtCore.QTimer() # 防双击导致多次打开
self.glass_order_timer.setInterval(600)
self.glass_order_timer.timeout.connect(self.order_glasses)
self.action_order_glasses = qt.newAction(self, self.tr('Order Glasses'), slot=self.glass_order_timer.start,
icon='order_glass')
self.lineEdit_ID.setEnabled(not self.actionAuto_ID.isChecked())
self.groupBox_Location.setVisible(self.actionActivate_School_Mode.isChecked())
# self.groupBox_Optometry.setVisible(self.actionActivate_School_Mode.isChecked())
if not self.is_chinese_system: # 非中文系统隐藏验光参数
self.groupBox_Optometry.hide()
self.pushButton_Grab.setIcon(qt.newIcon('grab'))
self.pushButton_Confirm.setIcon(qt.newIcon('confirm'))
self.pushButton_Attention.setIcon(qt.newIcon('warning'))
self.label_Tasks_Num.setMinimumWidth(50)
self.label_Tasks_Num.setAlignment(QtCore.Qt.AlignCenter)
self.label_Tasks_Num.setText('<font color=#7efb56>%s</font>' % 0)
# 设置验光参数
vision = [''] + [str(x / 10) for x in range(40, 54)]
self.comboBox_Uncorrected_Vision_OD.addItems(vision)
self.comboBox_Uncorrected_Vision_OS.addItems(vision)
self.comboBox_Corrected_Vision_OD.addItems(vision)
self.comboBox_Corrected_Vision_OS.addItems(vision)
diopter = ['+' + str(x / 100) if x >= 0 else str(x / 100) for x in range(2500, -2501, -25)]
diopter.insert(len(diopter) // 2, '')
self.comboBox_Spherical_Diopter_OD.addItems(diopter)
self.comboBox_Spherical_Diopter_OS.addItems(diopter)
self.comboBox_Orthokeratology_Diopter_OD.addItems(diopter)
self.comboBox_Orthokeratology_Diopter_OS.addItems(diopter)
# self.comboBox_Spherical_Diopter_OD.setCurrentIndex(len(diopter) // 2)
# self.comboBox_Spherical_Diopter_OS.setCurrentIndex(len(diopter) // 2)
astigmatism_diopter = ['+' + str(x / 100) if x >= 0 else str(x / 100) for x in range(1000, -1001, -25)]
astigmatism_diopter.insert(len(astigmatism_diopter) // 2, '')
self.comboBox_Astigmatism_Degree_OD.addItems(astigmatism_diopter)
self.comboBox_Astigmatism_Degree_OS.addItems(astigmatism_diopter)
# self.comboBox_Astigmatism_Degree_OD.setCurrentIndex(len(astigmatism_diopter) // 2)
# self.comboBox_Astigmatism_Degree_OS.setCurrentIndex(len(astigmatism_diopter) // 2)
astigmatism_axial = [''] + [str(x) for x in range(181)]
self.comboBox_Astigmatism_Axis_OD.addItems(astigmatism_axial)
self.comboBox_Astigmatism_Axis_OS.addItems(astigmatism_axial)
self.reset_optometry_widgets()
# 禁止用户选择省市区
self.comboBox_Province.setEnabled(False)
self.comboBox_City.setEnabled(False)
self.comboBox_County.setEnabled(False)
# 添加工具栏
self.tools = self.add_top_toolbar('', [self.action_msi_report, None, self.action_order_glasses, None,
self.actionData_Manager, None, self.actionReconnect,
self.actionRestart_Instrument_Quickly, self.actionFailure_Report])
# 添加系统监视控件
self.system_status_label = StatusLabel(self)
pix_size = self.system_status_label.label_network_status.height()
self.network_connected_pix = QtGui.QPixmap('icons/net_connected.png').scaled(pix_size, pix_size)
self.network_not_connected_pix = QtGui.QPixmap('icons/net_not_connected.png').scaled(pix_size, pix_size)
self.green_disk_pix = QtGui.QPixmap('icons/green_disk.png').scaled(pix_size, pix_size)
self.yellow_disk_pix = QtGui.QPixmap('icons/yellow_disk.png').scaled(pix_size, pix_size)
self.red_disk_pix = QtGui.QPixmap('icons/red_disk.png').scaled(pix_size, pix_size)
self.system_status_label.label_network_status.setPixmap(self.network_not_connected_pix)
self.system_status_label.label_disk_usage.setPixmap(self.green_disk_pix)
self.usb_driver_remove_menu = QtWidgets.QMenu('', self)
self.action_popup_usb_driver_remove_menu = None # 记录弹出的action,用于后续拔出USB时删除
# self.system_status_label.label_usb.addAction(
# qt.newAction(self, '111123', slot=lambda: self.usb_driver_remove_menu.exec_(QtGui.QCursor.pos())))
# 根据是否全屏显示做控件显示调整
if self.full_screen_flag:
# 关闭App的按钮隐藏
self.actionClose.setVisible(False)
# 封闭windows文字变更
self.actionSeal_Windows.setText(self.tr('Unseal Windows'))
# 工具栏控件显示
self.tools.addSeparator()
self.tools.addActions([None, self.action_restart_windows,
self.action_shutdown_windows])
self.tools.add_spacer()
self.tools.addWidget(self.system_status_label)
self.tools.addSeparator()
dt = DateTimeLabel(self)
self.tools.addWidget(dt)
dt.set_datetime()
else:
# 退到windows按钮隐藏
self.actionWindows_Mode.setVisible(False)
self.tools.add_spacer()
self.tools.addWidget(self.system_status_label)
# ---------------- parameters ----------------
self.camera = None
self.camera_timer = QtCore.QTimer()
self.camera_timer.timeout.connect(self.show_video_stream)
self.grab_flag = False
self.image_clip = None
self.current_box_geometry = None
self.centered = False
self.sleeper = tasks.SleepCounter(self, self.config['SleepDuration'])
# self.image_display_zone = (self.config['Image']['Width'] - self.config['Image']['DisplayWidth']) / 2, \
# (self.config['Image']['Height'] - self.config['Image']['DisplayHeight']) / 2, \
# (self.config['Image']['Width'] + self.config['Image']['DisplayWidth']) / 2, \
# (self.config['Image']['Height'] + self.config['Image']['DisplayHeight']) / 2,
self.need_reset_pix_pos = False
self.grpc_channel = None
self.grpc_stub = None
self.grpc_connection_thread = None
self.red_dot = QtGui.QPixmap('icons/red.png').scaled(
self.label_GRPC.height(), self.label_GRPC.height())
self.green_dot = QtGui.QPixmap('icons/green.png').scaled(
self.label_GRPC.height(), self.label_GRPC.height())
self.current_tasks = 0
self.patients_in_orthanc = []
self.patients_on_cloud = []
self.patients_to_show = []
self.is_patient_selecting = False # 控制填充line edit时的逻辑,避免报错
self.ocr_mode = False
self.current_school = ''
self.current_grades = []
self.current_classes = []
self.current_student_info = {}
self.cloud_token = ''
self.update_optometry_mode = False
self.locale_students_in_database = []
self.recent_students_in_orthanc = []
self.mutex = QtCore.QMutex() # 数据库访问锁
self.msi_device_sn = window_operation.get_info_from_reg(window_operation.CURRENT_USER_HKEY,
window_operation.RegPathKey, window_operation.SNKey)
self.msi_device_model = window_operation.get_info_from_reg(window_operation.CURRENT_USER_HKEY,
window_operation.RegPathKey, window_operation.ModelKey)
self.device_sw_version = ''
self.device_algorithm = ''
self.network_ip = None
self.cloud_search_trigger = QtCore.QTimer()
self.cloud_search_trigger.timeout.connect(self.search_students_on_cloud)
self.orthanc_search_trigger = QtCore.QTimer()
self.orthanc_search_trigger.timeout.connect(self.search_patients_in_orthanc)
# self.check_fail_timer = QtCore.QTimer()
# self.check_fail_timer.timeout.connect(self.check_failed_tasks)
# self.check_fail_timer.start(30000)
self.forbidden_key_triggered = False
tianjin_eye_hospital_status = window_operation.get_info_from_reg(window_operation.CURRENT_USER_HKEY,
window_operation.CustomPathKey,
window_operation.TianjinEyeHospital)
if tianjin_eye_hospital_status == 'True':
self.action_enable_tianjin_eye_hospital.setChecked(True)
else:
self.action_enable_tianjin_eye_hospital.setChecked(False)
tianjin_eye_hospital_confirm_status = window_operation.get_info_from_reg(window_operation.CURRENT_USER_HKEY,
window_operation.CustomPathKey,
window_operation.TianjinEyeHospitalConfirm)
if tianjin_eye_hospital_confirm_status == 'True':
self.action_tianjin_eye_hospital_confirm_automatically.setChecked(True)
else:
self.action_tianjin_eye_hospital_confirm_automatically.setChecked(False)
# ---------------- signal ----------------
self.label_View.mouse_dragged.connect(self.show_rectangle)
self.label_View.pix_scaled.connect(self.pix_resized)
self.pushButton_Confirm.clicked.connect(self.confirm_new)
self.pushButton_Grab.clicked.connect(self.grab_image)
self.pushButton_Camera.clicked.connect(self.open_camera)
self.pushButton_Save.clicked.connect(self.save_new_scene)
self.pushButton_New_Scene.clicked.connect(self.new_scene)
self.pushButton_Attention.clicked.connect(self.show_failed_task)
self.comboBox_Scene.currentIndexChanged.connect(self.show_notice_box)
# self.comboBox_Date_Format.currentIndexChanged.connect(self.save_as_default)
self.table_view.itemDoubleClicked.connect(self.select_patient_to_fill)
self.table_view.itemDoubleClicked.connect(self.open_workflow_dialog)
self.lineEdit_ID.textChanged.connect(lambda: self.orthanc_search_trigger.start(250))
self.lineEdit_ID.textChanged.connect(lambda: self.cloud_search_trigger.start(250))
self.lineEdit_Last_Name.textChanged.connect(lambda: self.orthanc_search_trigger.start(250))
self.lineEdit_Last_Name.textChanged.connect(lambda: self.cloud_search_trigger.start(250))
# 角膜塑形镜控件联动
self.radioButton_Orthokeratology_Lens.toggled.connect(
lambda: self.label_29.setVisible(self.radioButton_Orthokeratology_Lens.isChecked()))
self.radioButton_Orthokeratology_Lens.toggled.connect(
lambda: self.comboBox_Orthokeratology_Diopter_OD.setVisible(
self.radioButton_Orthokeratology_Lens.isChecked()))
self.radioButton_Orthokeratology_Lens.toggled.connect(
lambda: self.comboBox_Orthokeratology_Diopter_OS.setVisible(
self.radioButton_Orthokeratology_Lens.isChecked()))
self.actionGrab.triggered.connect(self.grab_image)
self.actionOpen_Camera.triggered.connect(self.open_camera)
self.actionNew.triggered.connect(self.new_scene)
self.actionSave.triggered.connect(self.save_new_scene)
self.actionConfirm.triggered.connect(self.confirm_new)
self.actionSet_Default.triggered.connect(self.save_as_default)
self.actionConnect_Instrument.triggered.connect(self.connect_grpc)
# self.actionAuto_Next.triggered.connect(self.save_as_default)
# self.actionAuto_ID.triggered.connect(self.save_as_default)
self.actionAuto_ID.triggered.connect(
lambda: self.lineEdit_ID.setEnabled(not self.actionAuto_ID.isChecked()))
self.actionActivate_School_Mode.triggered.connect(self.school_action_state_change)
self.actionActivate_School_Mode.triggered.connect(
lambda: self.groupBox_Location.setVisible(self.actionActivate_School_Mode.isChecked()))
if self.is_chinese_system:
self.actionActivate_School_Mode.triggered.connect(
lambda: self.groupBox_Optometry.setVisible(not self.actionActivate_School_Mode.isChecked()))
self.actionActivate_School_Mode.triggered.connect(
lambda: self.actionQuery_Unchecked_Students.setEnabled(self.actionActivate_School_Mode.isChecked()))
self.actionActivate_School_Mode.triggered.connect(
lambda: self.actionUpdate_Students_Optometry.setEnabled(self.actionActivate_School_Mode.isChecked()))
self.actionActivate_School_Mode.triggered.connect(
lambda: self.actionConfig_Cloud_Address.setEnabled(self.actionActivate_School_Mode.isChecked()))
self.actionActivate_School_Mode.triggered.connect(
lambda: self.third_app_menu.setEnabled(self.actionActivate_School_Mode.isChecked()))
self.actionActivate_School_Mode.triggered.connect(
lambda: self.third_instrument_menu.setEnabled(self.actionActivate_School_Mode.isChecked()))
self.actionActivate_School_Mode.triggered.connect(
lambda: self.school_query_source_menu.setEnabled(self.actionActivate_School_Mode.isChecked()))
self.actionQuery_Unchecked_Students.triggered.connect(self.query_unchecked_students_on_cloud)
self.actionQuery_Unchecked_Students.triggered.connect(self.school_action_state_change)
self.actionUpdate_Students_Optometry.triggered.connect(self.query_optometry_students_on_cloud)
self.actionUpdate_Students_Optometry.triggered.connect(self.school_action_state_change)
self.actionConfig_Cloud_Address.triggered.connect(self.config_cloud_address)
self.actionReconnect.triggered.connect(self.restart_transfer)
self.actionRestart_Instrument_Quickly.triggered.connect(self.restart_instrument)
self.actionFailure_Report.triggered.connect(self.failure_report)
self.actionInstrument_Information.triggered.connect(self.show_instrument_information)
self.actionSynchronize_Time.triggered.connect(self.synchronize_time)
self.actionData_Manager.triggered.connect(self.data_manager)
self.actionWindows_Mode.triggered.connect(self.exit2windows)
self.actionSeal_Windows.triggered.connect(self.seal_windows)
self.actionTrial_Code.triggered.connect(self.import_trial_code)
self.actionUpdateSoftware.triggered.connect(self.check_update_status)
self.dateEdit.dateChanged.connect(self.calculate_age_from_date)
self.lineEdit_Age.textChanged.connect(self.calculate_date_from_age)
self.lineEdit_First_Name.textChanged.connect(self.auto_id)
self.lineEdit_Middle_Name.textChanged.connect(self.auto_id)
self.lineEdit_Last_Name.textChanged.connect(self.auto_id)
self.dateEdit.dateChanged.connect(self.auto_id)
self.radioButton_Male.clicked.connect(self.auto_id)
self.radioButton_Female.clicked.connect(self.auto_id)
self.radioButton_Unkown.clicked.connect(self.auto_id)
self.label_View.customContextMenuRequested[QtCore.QPoint].connect(
lambda: self.menu.exec_(QtGui.QCursor.pos()))
self.sleeper.timeout.connect(self.open_camera)
# 省市区联动信号
self.comboBox_Province.currentIndexChanged.connect(self.change_nation_location)
self.comboBox_City.currentIndexChanged.connect(self.change_nation_location)
self.comboBox_Province.currentIndexChanged.connect(self.reset_patient_info)
self.comboBox_City.currentIndexChanged.connect(self.reset_patient_info)
self.comboBox_County.currentIndexChanged.connect(self.reset_patient_info)
# 班级年级变化
self.comboBox_Grade.currentIndexChanged.connect(self.search_students_on_cloud)
self.comboBox_Class.currentIndexChanged.connect(self.search_students_on_cloud)
self.comboBox_Grade.currentIndexChanged.connect(self.reset_patient_info)
self.comboBox_Class.currentIndexChanged.connect(self.reset_patient_info)
# 学校名称改变,清除已查询数据
self.lineEdit_School_Name.textChanged.connect(self.clear_cloud_queried)
# 是否已拍摄筛选
self.radioButton_Not_Examined.clicked.connect(lambda: self.cloud_search_trigger.start(250))
self.radioButton_Examined.clicked.connect(lambda: self.cloud_search_trigger.start(250))
# ---------------- staff ----------------
if self.full_screen_flag:
self.logger.info('Launch App full screen without title')
else:
self.logger.info('Launch App normal')
self.mode_change()
self.school_action_state_change()
# log_file = os.path.join(LOG_FOLDER, f'{QtCore.QDateTime.currentDateTime().toString("yyyyMMdd")}.log')
self.label_GRPC.setPixmap(self.red_dot)
self.label_GRPC_Status.setText(self.tr('%s Not Ready') % self.tr('MSI Instrument'))
self.label_Refractometer.setPixmap(self.red_dot)
self.label_Refractometer_Status.setText(self.tr('%s Not Ready') % self.tr('Refractometer'))
self.label_Visual_Chart.setPixmap(self.red_dot)
self.label_Visual_Chart_Status.setText(self.tr('%s Not Ready') % self.tr('Vision Chart'))
QtCore.QTimer.singleShot(500, self.connect_grpc)
# 启动task server
# self.task_thread = tasks.GrpcTasksServer(self)
# self.task_thread.tasks_changed.connect(self.update_tasks_num)
# self.task_thread.show_hide.connect(lambda: self.setVisible(not self.isVisible()))
# self.task_thread.failed_tasks.connect(self.show_warning_btn)
# self.task_thread.start()
# 查询orthanc数据
tip = Notice(self, '<font color=SpringGreen>%s</font>'
% self.tr('Querying patients on orthanc, please wait...'))
self.patients_query_in_orthanc_thread = tasks.GetPatientsInOrthanc(self, self.config['orthanc']['IP'],
self.config['orthanc']['Port'],
self.config['orthanc']['QueryInterval'],
self.config['orthanc']['MaxWaitingTimeInstrumentQueryReady'])
self.patients_query_in_orthanc_thread.queried.connect(self.update_patients_in_orthanc)
self.patients_query_in_orthanc_thread.query_failed.connect(self.query_patients_failed)
self.patients_query_in_orthanc_thread.ready_to_notice.connect(tip.show)
self.patients_query_in_orthanc_thread.queried.connect(tip.close)
self.patients_query_in_orthanc_thread.query_failed.connect(tip.close)
self.patients_query_in_orthanc_thread.finished.connect(tip.close)
self.patients_query_in_orthanc_thread.orthanc_is_not_running.connect(
lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
if not self.actionActivate_School_Mode.isChecked(): # 非校园筛查时开启本地orthanc查询
self.logger.info('Start thread of querying patients in Orthanc')
QtCore.QTimer.singleShot(1800, self.patients_query_in_orthanc_thread.start)
self.school_parser = location_parse.SchoolGradeClass(self.patients_on_cloud)
self.token_thread = tasks.GetTokenOfCloud(self, self.config['SchoolMode']['QueryIPAddress'],
self.config['SchoolMode']['QueryPort'],
self.config['SchoolMode']['TokenQueryInterval'],
self.config['SchoolMode']['TokenQueryID'],
self.config['SchoolMode']['TokenQueryKey'],
self.config['SchoolMode']['QueryTimeout'])
self.token_thread.query_failed.connect(self.query_patients_failed)
# 校园筛查时自动启动token更新
if self.is_chinese_system and self.actionActivate_School_Mode.isChecked() and self.action_enable_thondar.isChecked():
self.token_thread.start()
# EYE Nurse broadcasting
self.eye_nurse_broadcast_thread = tasks.EyeNurseBroadCast(
self, self.third_config['EyeNurse']['BroadcastInterval'],
broadcast_port=self.third_config['EyeNurse']['BroadcastPort'])
# EYE Nurse 病人信息接收
self.eye_nurse_receive_thread = \
tasks.EyeNursePatientIDService(self, local_port=self.third_config['EyeNurse']['LocalServerPort'])
self.eye_nurse_receive_thread.patient_id_received.connect(lambda msg: self.lineEdit_ID.setText(msg))
self.eye_nurse_service()
# 填充省市区
if self.is_chinese_system:
self.comboBox_Province.addItems(self.nation_location.get_province_names())
if self.school_config['ExaminationAddress']:
self.lineEdit_ExaminationAddress.setText(self.school_config['ExaminationAddress'])
if self.school_config['Province']:
self.comboBox_Province.setCurrentText(self.school_config['Province'])
if self.school_config['City']:
self.comboBox_City.setCurrentText(self.school_config['City'])
if self.school_config['County']:
self.comboBox_County.setCurrentText(self.school_config['County'])
if self.school_config['SchoolName']:
self.lineEdit_School_Name.setText(self.school_config['SchoolName'])
# 初始化验光数据对话框
self.optometry_dialog = OptometryModification(self)
self.optometry_dialog.modified.connect(self.update_optometry_on_cloud)
self.optometry_dialog.dialog_closed.connect(self.reset_patient_info)
# 初始化验光流程对话框
self.work_flow_dialog = StudyStatus(self)
self.work_flow_dialog.fr8900_is_available.connect(lambda: self.label_Refractometer.setPixmap(self.green_dot))
self.work_flow_dialog.fr8900_is_available.connect(
lambda: self.label_Refractometer_Status.setText(self.tr('%s Ready') % self.tr('Refractometer')))
self.work_flow_dialog.fr8900_not_available.connect(lambda: self.label_Refractometer.setPixmap(self.red_dot))
self.work_flow_dialog.fr8900_not_available.connect(
lambda: self.label_Refractometer_Status.setText(self.tr('%s Not Ready') % self.tr('Refractometer')))
self.work_flow_dialog.vat300_is_available.connect(lambda: self.label_Visual_Chart.setPixmap(self.green_dot))
self.work_flow_dialog.vat300_is_available.connect(
lambda: self.label_Visual_Chart_Status.setText(self.tr('%s Ready') % self.tr('Vision Chart')))
self.work_flow_dialog.vat300_not_available.connect(lambda: self.label_Visual_Chart.setPixmap(self.red_dot))
self.work_flow_dialog.vat300_not_available.connect(
lambda: self.label_Visual_Chart_Status.setText(self.tr('%s Not Ready') % self.tr('Vision Chart')))
self.work_flow_dialog.dialog_closed.connect(self.reset_patient_info)
self.work_flow_dialog.ready.connect(self.confirm_new)
# 启动后台处理线程,暂时不启用
# QtCore.QTimer.singleShot(2000, self.optometry_dialog.update_optometry_background)
# 初始化数据库更新线程,其finish信号用于刷新表格, 工作流变更,此功能可不用
# self.update_database_thread = tasks.DatabaseOperation(self, DATABASE)
# self.update_database_thread.error.connect(
# lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
# QtWidgets.QMessageBox.Ok,
# QtWidgets.QMessageBox.Ok))
# self.update_database_thread.finished.connect(self.search_students_on_cloud)
# 检查CPU,内存,硬盘占用,下单系统连接状态
t = tasks.SystemMonitor(self, self.logger, self.config['SystemMonitor']['NetWorkCheckAddress'],
self.config['SystemMonitor']['NetWorkCheckPort'])
t.orthanc_server_ip = self.config['orthanc']['IP']
t.orthanc_server_port = self.config['orthanc']['Port']
t.disk_usage_warning_threshold = self.config['SystemMonitor']['StorageUsedPercentWarningThreshold']
t.cpu_usage_warning_threshold = self.config['SystemMonitor']['CPUUsedPercentWarningThreshold']
t.memory_usage_warning_threshold = self.config['SystemMonitor']['MemoryUsedPercentWarningThreshold']
t.disk_usage_warning.connect(
lambda msg: QtWidgets.QMessageBox.information(self, self.tr('Information'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.backup_remind_send.connect(
lambda msg: QtWidgets.QMessageBox.information(self, self.tr('Information'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.disk_usage_green.connect(lambda: self.system_status_label.label_disk_usage.setPixmap(self.green_disk_pix))
t.disk_usage_green.connect(lambda msg: self.system_status_label.label_disk_usage.setToolTip(msg))
t.disk_usage_yellow.connect(lambda: self.system_status_label.label_disk_usage.setPixmap(self.yellow_disk_pix))
t.disk_usage_yellow.connect(lambda msg: self.system_status_label.label_disk_usage.setToolTip(msg))
t.orthanc_is_not_running.connect(lambda: self.system_status_label.label_disk_usage.setPixmap(self.red_disk_pix))
t.orthanc_is_not_running.connect(lambda msg: self.system_status_label.label_disk_usage.setToolTip(msg))
t.orthanc_is_not_running.connect(
lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.cpu_usage.connect(lambda msg: self.system_status_label.label_cpu_usage.setText(msg))
t.memory_usage.connect(lambda msg: self.system_status_label.label_memory_usage.setText(msg))
t.network_status_info.connect(self.network_status)
QtCore.QTimer.singleShot(1000, t.start)
# USB 插拔监测
self.usb_driver_thread = usb_detector.USBDriverStaff(self, self.logger)
self.usb_driver_thread.usb_driver_inserted.connect(self.add_usb_driver)
self.usb_driver_thread.usb_driver_removed.connect(self.remove_usb_driver)
self.usb_driver_thread.start()
# -----------------------------------------------------------------------------------------
# 显示
if self.full_screen_flag:
# self.setWindowFlags(QtCore.Qt.FramelessWindowHint)
self.showFullScreen()
else:
self.showMaximized()
def reset_optometry_widgets(self):
self.comboBox_Spherical_Diopter_OD.setCurrentIndex(self.comboBox_Spherical_Diopter_OD.count() // 2 - 1)
self.comboBox_Spherical_Diopter_OS.setCurrentIndex(self.comboBox_Spherical_Diopter_OS.count() // 2 - 1)
self.comboBox_Orthokeratology_Diopter_OD.setCurrentIndex(
self.comboBox_Orthokeratology_Diopter_OD.count() // 2 - 1)
self.comboBox_Orthokeratology_Diopter_OS.setCurrentIndex(
self.comboBox_Orthokeratology_Diopter_OS.count() // 2 - 1)
self.comboBox_Spherical_Diopter_OS.setCurrentIndex(self.comboBox_Spherical_Diopter_OS.count() // 2 - 1)
self.comboBox_Astigmatism_Degree_OD.setCurrentIndex(self.comboBox_Astigmatism_Degree_OD.count() // 2 - 1)
self.comboBox_Astigmatism_Degree_OS.setCurrentIndex(self.comboBox_Astigmatism_Degree_OS.count() // 2 - 1)
self.comboBox_Astigmatism_Axis_OD.setCurrentIndex(0)
self.comboBox_Astigmatism_Axis_OS.setCurrentIndex(0)
self.comboBox_Uncorrected_Vision_OD.setCurrentIndex(0)
self.comboBox_Uncorrected_Vision_OS.setCurrentIndex(0)
self.comboBox_Corrected_Vision_OD.setCurrentIndex(0)
self.comboBox_Corrected_Vision_OS.setCurrentIndex(0)
self.lineEdit_Pupil_Distance.clear()
def reset_patient_info(self):
if self.is_patient_selecting: # 填充时不用清空
return
self.lineEdit_ID.clear()
self.lineEdit_ID.setFocus()
self.lineEdit_Last_Name.clear()
self.lineEdit_Middle_Name.clear()
self.lineEdit_First_Name.clear()
self.dateEdit.setDate(QtCore.QDate.currentDate())
# self.current_student_info.clear()
self.table_view.setCurrentItem(None) # 取消选中行
self.radioButton_No_Glasses.setChecked(True)
def school_action_state_change(self):
if self.sender() == self.actionUpdate_Students_Optometry: # 更新验光数据状态
self.update_optometry_mode = True
self.groupBox_Optometry.hide()
self.pushButton_Confirm.hide()
self.groupBox_Status_Filter.hide()
self.radioButton_Not_Examined.setText(self.tr('Not Updated'))
self.radioButton_Examined.setText(self.tr('Updated'))
return
elif self.sender() == self.actionQuery_Unchecked_Students: # 查询待检学生时状态
self.update_optometry_mode = False
self.groupBox_Optometry.hide()
self.pushButton_Confirm.hide()
self.groupBox_Status_Filter.hide() # note 筛查时暂时隐藏分类勾选
self.radioButton_Not_Examined.setText(self.tr('Not Examined'))
self.radioButton_Examined.setText(self.tr('Examined'))
return
elif self.sender() == self.actionActivate_School_Mode:
self.update_optometry_mode = False
# self.groupBox_Optometry.show()
self.pushButton_Confirm.show()
if not self.is_chinese_system and self.actionActivate_School_Mode.isChecked(): # 非中文系统禁用校筛
self.actionActivate_School_Mode.setChecked(False)
self.groupBox_Optometry.hide()
QtWidgets.QMessageBox.warning(self, self.tr('Warning'),
self.tr(
'School mode is designed for Chinese system platform, can not be activated on this system!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
if self.actionActivate_School_Mode.isChecked():
self.groupBox_Optometry.hide() # 新的workflow,不显示视力输入widget
self.pushButton_Confirm.hide()
self.label_Refractometer.setVisible(self.config['SchoolMode']['RefractometerRequired'])
self.label_Refractometer_Status.setVisible(self.config['SchoolMode']['RefractometerRequired'])
self.label_Visual_Chart.setVisible(self.config['SchoolMode']['VisualChartRequired'])
self.label_Visual_Chart_Status.setVisible(self.config['SchoolMode']['VisualChartRequired'])
# self.label_Local_IP.show()
self.actionActivate_School_Mode.setText(self.tr('Deactivate School Mode'))
self.groupBox_Status_Filter.hide() # note 筛查时暂时隐藏分类勾选
if hasattr(self, 'token_thread') and not self.token_thread.isRunning() and \
self.action_enable_thondar.isChecked():
self.token_thread.start()
if hasattr(self, 'patients_query_in_orthanc_thread') and \
self.patients_query_in_orthanc_thread.isRunning():
self.patients_query_in_orthanc_thread.terminate()
else:
self.pushButton_Confirm.show()
self.label_Refractometer.hide()
self.label_Refractometer_Status.hide()
self.label_Visual_Chart.hide()
self.label_Visual_Chart_Status.hide()
# self.label_Local_IP.hide()
self.groupBox_Status_Filter.hide()
self.actionActivate_School_Mode.setText(self.tr('Activate School Mode'))
if hasattr(self, 'token_thread') and self.token_thread.isRunning():
self.token_thread.terminate()
self.cloud_token = ''
if hasattr(self, 'patients_query_in_orthanc_thread') and \
not self.patients_query_in_orthanc_thread.isRunning():
self.patients_query_in_orthanc_thread.start()
self.reset_patient_info()
self.patients_to_show.clear()
self.fill_table_with_unchecked_students()
if hasattr(self, 'eye_nurse_broadcast_thread') and hasattr(self, 'eye_nurse_receive_thread'):
self.eye_nurse_service()
def mode_change(self):
if self.ocr_mode:
self.pushButton_Grab.show()
self.label_14.show()
self.label_13.show()
self.comboBox_Date_Format.show()
self.comboBox_Scene.show()
self.central_layout.setCurrentWidget(self.label_View) # 调整stack控件显示
else:
self.pushButton_Grab.hide()
self.label_14.hide()
self.label_13.hide()
self.comboBox_Date_Format.hide()
self.comboBox_Scene.hide()
self.central_layout.setCurrentWidget(self.table_view)
def update_tasks_num(self, num):
if num == 0:
self.label_Tasks_Num.setText('<font color=#7efb56>%s</font>' % num)
else:
self.label_Tasks_Num.setText('<font color=#d3fb56>%s</font>' % num)
self.current_tasks = num
def show_warning_btn(self, task):
if 'Empty' in task:
self.pushButton_Attention.hide()
else:
self.pushButton_Attention.show()
def show_failed_task(self):
try:
channel = grpc.insecure_channel('127.0.0.1:62041')
grpc.channel_ready_future(channel).result(timeout=0.5)
stub = ShowFailedProcess_pb2_grpc.ShowFailedProcessDialogStub(channel)
stub.ShowDialog(ShowFailedProcess_pb2.ShowRequest(DoShow=True))
channel.close()
except grpc.FutureTimeoutError:
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr('Image processing services are not running!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
def save_as_default(self):
scene_name = self.comboBox_Scene.currentText()
self.config['Scene'] = scene_name
self.config['AgeFormat']['Default'] = self.comboBox_Date_Format.currentText()
self.config['Instrument']['AutoNext'] = self.actionAuto_Next.isChecked()
self.config['Instrument']['AutoID'] = self.actionAuto_ID.isChecked()
self.config['SchoolMode']['Activated'] = self.actionActivate_School_Mode.isChecked()
if self.original_config != self.config:
with open(self.config_file, 'w') as f:
json.dump(self.config, f, indent=4, ensure_ascii=True)
def save_school_setting(self):
self.school_config['ExaminationAddress'] = self.lineEdit_ExaminationAddress.text().strip()
self.school_config['Province'] = self.comboBox_Province.currentText()
self.school_config['City'] = self.comboBox_City.currentText()
self.school_config['County'] = self.comboBox_County.currentText()
self.school_config['SchoolName'] = self.lineEdit_School_Name.text().strip()
if self.original_school_config != self.school_config:
with open(self.user_school_config_file, 'w') as f:
json.dump(self.school_config, f, indent=4, ensure_ascii=True)
def save_third_setting(self):
self.third_config['EyeNurse']['Activated'] = self.action_enable_eye_nurse.isChecked()
if self.original_third_config != self.third_config:
with open(os.path.join(tasks.here, 'config/third_part.json'), 'w') as f:
json.dump(self.third_config, f, indent=4, ensure_ascii=True)
def connect_grpc(self):
if self.grpc_channel:
self.grpc_channel.close()
self.grpc_channel = None
self.grpc_stub = None
self.label_GRPC.setPixmap(self.red_dot)
self.label_GRPC_Status.setText(self.tr('%s Not Ready') % self.tr('MSI Instrument'))
if not self.grpc_connection_thread:
self.grpc_connection_thread = tasks.InstrumentGrpcConnection(self)
self.grpc_connection_thread.succeeded.connect(lambda: self.label_GRPC.setPixmap(self.green_dot))
self.grpc_connection_thread.succeeded.connect(lambda: self.label_GRPC_Status.setText(
self.tr('%s Ready') % self.tr('MSI Instrument')))
# self.grpc_connection_thread.succeeded.connect(lambda: self.label_Instrument_SN.setText(
# self.tr('Instrument SN: %s') % '<font color=Goldenrod>%s</font>' % self.msi_device_sn))
# self.grpc_connection_thread.succeeded.connect(
# lambda: self.setWindowTitle(self.windowTitle() + ' | SN: ' + self.msi_device_sn))
self.grpc_connection_thread.succeeded.connect(self.record_instrument_sn)
self.grpc_connection_thread.succeeded.connect(self.eye_nurse_service)
self.grpc_connection_thread.failed.connect(lambda: self.label_GRPC.setPixmap(self.red_dot))
self.grpc_connection_thread.failed.connect(lambda: self.label_GRPC_Status.setText(
self.tr('%s Not Ready') % self.tr('MSI Instrument')))
else:
self.grpc_connection_thread.terminate()
self.grpc_connection_thread.wait()
self.grpc_connection_thread.connected = False
self.grpc_connection_thread.start()
def switch_language(self):
if QtCore.QLocale.system().name() == 'zh_CN':
self.lineEdit_Middle_Name.hide()
self.lineEdit_First_Name.hide()
self.label_4.hide()
self.label_3.hide()
if os.path.exists(s:='language/study_zh.qm'):
language_file = s
else:
return
else:
return
app_instance = QtWidgets.QApplication.instance()
trans = QtCore.QTranslator(self)
trans.load(language_file)
app_instance.installTranslator(trans)
self.retranslateUi(self)
def calculate_date_from_age(self):
txt = self.lineEdit_Age.text()
if txt and txt.isnumeric():
age = int(txt)
if age and self.lineEdit_Age.hasFocus():
current_date = QtCore.QDate.currentDate()
self.dateEdit.setDate(current_date.addYears(-age))
else:
self.dateEdit.setDate(QtCore.QDate.currentDate())
def calculate_age_from_date(self, date):
current_date = QtCore.QDate.currentDate()
age = date.daysTo(current_date) // 365
if age < 0:
self.dateEdit.setDate(QtCore.QDate.currentDate())
self.lineEdit_Age.clear()
self.lineEdit_Age.setText(str(age))
def open_camera(self):
try:
if self.actionOpen_Camera.text() == self.tr('Open Camera'):
self.camera = cv2.VideoCapture(self.config['OCRCamera'], cv2.CAP_DSHOW)
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, self.config['Image']['Width'])
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, self.config['Image']['Height'])
if not self.camera.isOpened():
QtWidgets.QMessageBox.warning(self, self.tr('Warning'),
self.tr('No camera detected!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
self.actionOpen_Camera.setText(self.tr('Close Camera'))
self.ocr_mode = True
self.show_video_stream()
QtCore.QTimer.singleShot(50, self.show_notice_box)
if not self.centered:
self.center_ui()
self.logger.info('Open camera')
if self.config['EngineerMode']:
self.label_View.draw_flag = False
# 启动休眠器
if self.config['EnableSleep'] and not self.sleeper.isRunning():
self.sleeper.counting = True
self.sleeper.start()
else:
self.release_camera()
self.ocr_mode = False
self.actionOpen_Camera.setText(self.tr('Open Camera'))
self.logger.info('Close camera')
if self.config['EngineerMode']:
self.label_View.draw_flag = True
self.clear_boxes()
# 关闭休眠器
if self.config['EnableSleep']:
self.sleeper.counting = False
self.sleeper.terminate()
self.mode_change()
except Exception as e:
QtWidgets.QMessageBox.critical(self, self.tr('Error'), e,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
def release_camera(self):
if self.camera_timer.isActive():
self.camera_timer.stop()
if self.camera and self.camera.isOpened():
self.camera.release()
self.camera = None
if self.grpc_channel:
self.grpc_channel.close()
self.grpc_channel = None
self.grpc_stub = None
def show_video_stream(self):
if self.camera.isOpened():
success, frame = self.camera.read()
if success:
if self.config['Image']['Flip']:
frame = cv2.flip(frame, 1) # 图像左右翻转
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
frame = cv2.resize(frame, None, fx=self.label_View.scale,
fy=self.label_View.scale,
interpolation=cv2.INTER_LINEAR)
# 必须使用Qt的图像转换方法,Pillow转换速度太慢
im = QtGui.QImage(frame.data, frame.shape[1], frame.shape[0],
QtGui.QImage.Format_RGB888)
pix = QtGui.QPixmap.fromImage(im)
if self.grab_flag:
self.image_clip = frame
self.grab_flag = False
self.read_info()
self.label_View.pix = pix
if self.need_reset_pix_pos: # 先进行pix定位再刷新,不会出现图像位置跳变
self.resizeEvent()
if self.centered: # UI调整后再刷新显示
self.label_View.repaint()
self.camera_timer.start(1000 // self.config['Image']['FPS'])
def confirm_new(self, ready_data: list = None, cloud_student_info: dict = None):
patient_id = self.lineEdit_ID.text().strip()
if '*' * 12 in patient_id and qt.isChineseCitizenID(self.current_student_info.get('ID', '')):
patient_id = self.current_student_info.get('ID', '')
first_name = self.lineEdit_First_Name.text().strip()
middle_name = self.lineEdit_Middle_Name.text().strip()
last_name = self.lineEdit_Last_Name.text().strip()
birth_date = self.dateEdit.date()
age = self.lineEdit_Age.text().strip()
if age == '':
age = 0
age = int(age)
if not patient_id or not last_name or age < 0:
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr('Make sure ID, last name and birth date are valid!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
if self.radioButton_Male.isChecked():
gender = self.radioButton_Male.text()
elif self.radioButton_Female.isChecked():
gender = self.radioButton_Female.text()
else:
gender = self.radioButton_Unkown.text()
patient_info = {}
examination_address = self.lineEdit_ExaminationAddress.text().strip()
if self.is_chinese_system:
province_name = self.comboBox_Province.currentText()
# patient_info['Province'] = province_name
province = self.nation_location.get_province_id_by_name(province_name) # 取id
city_name = self.comboBox_City.currentText()
# patient_info['City'] = city_name
city = self.nation_location.get_city_id_by_name(province_name, city_name) # 取id
county_name = self.comboBox_County.currentText()
# patient_info['County'] = county_name
county = self.nation_location.get_county_id_by_name(province_name, city_name, county_name) # 取id
school_name = self.current_school
school = self.school_parser.get_school_id(school_name) # 取id
grade_name = self.comboBox_Grade.currentText()
grade = self.school_parser.get_grade_id(grade_name) # 取id
class_name = self.comboBox_Class.currentText()
class_number = self.school_parser.get_class_id(class_name) # 取id
else:
province_name = ''
province = ''
city_name = ''
city = ''
county_name = ''
county = ''
school_name = ''
school = ''
grade_name = ''
grade = ''
class_name = ''
class_number = ''
if self.radioButton_Frame_Glasses.isChecked():
glasses_type = '1'
elif self.radioButton_Contact_Lenses.isChecked():
glasses_type = '2'
elif self.radioButton_Orthokeratology_Lens.isChecked():
glasses_type = '3'
else:
glasses_type = '4'
uncorrected_vision_od = self.comboBox_Uncorrected_Vision_OD.currentText()
uncorrected_vision_os = self.comboBox_Uncorrected_Vision_OS.currentText()
corrected_vision_od = self.comboBox_Corrected_Vision_OD.currentText()
corrected_vision_os = self.comboBox_Corrected_Vision_OS.currentText()
spherical_diopter_od = self.comboBox_Spherical_Diopter_OD.currentText()
spherical_diopter_os = self.comboBox_Spherical_Diopter_OS.currentText()
orthokeratology_diopter_od = self.comboBox_Orthokeratology_Diopter_OD.currentText()
orthokeratology_diopter_os = self.comboBox_Orthokeratology_Diopter_OS.currentText()
equivalent_spherical_diopter_od = spherical_diopter_od # 等效球镜度
equivalent_spherical_diopter_os = spherical_diopter_os
astigmatism_degree_od = self.comboBox_Astigmatism_Degree_OD.currentText()
astigmatism_degree_os = self.comboBox_Astigmatism_Degree_OS.currentText()
astigmatism_axis_od = self.comboBox_Astigmatism_Axis_OD.currentText()
astigmatism_axis_os = self.comboBox_Astigmatism_Axis_OS.currentText()
cylindrical_diopter_od = ''
cylindrical_diopter_os = ''
cylinder_axis_od = ''
cylinder_axis_os = ''
pupillary_distance = self.lineEdit_Pupil_Distance.text().strip()
pupil_size_od = ''
pupil_size_os = ''
steepest_radius_of_curvature_od = ''
steepest_radius_of_curvature_os = ''
steepest_degree_of_curvature_od = ''
steepest_degree_of_curvature_os = ''
steepest_direction_of_curvature_od = ''
steepest_direction_of_curvature_os = ''
flatest_radius_of_curvature_od = ''
flatest_radius_of_curvature_os = ''
flatest_degree_of_curvature_od = ''
flatest_degree_of_curvature_os = ''
flatest_direction_of_curvature_od = ''
flatest_direction_of_curvature_os = ''
average_radius_of_curvature_od = ''
average_radius_of_curvature_os = ''
average_degree_of_curvature_od = ''
average_degree_of_curvature_os = ''
record_id = 0
# workflow发送过来的数据处理
if self.sender() == self.work_flow_dialog and ready_data and cloud_student_info:
record_id = cloud_student_info.get('record_id', 0)
province = cloud_student_info.get('province_id')
city = cloud_student_info.get('city_id')
county = cloud_student_info.get('area_id')
school = cloud_student_info.get('school_id')
grade = cloud_student_info.get('grade_id')
class_number = cloud_student_info.get('class_id')
uncorrected_vision_od, corrected_vision_od, uncorrected_vision_os, corrected_vision_os, \
pupillary_distance, spherical_diopter_od, astigmatism_degree_od, astigmatism_axis_od, \
equivalent_spherical_diopter_od, spherical_diopter_os, astigmatism_degree_os, \
astigmatism_axis_os, equivalent_spherical_diopter_os = ready_data[12:25]
if uncorrected_vision_od in ('', None):
uncorrected_vision_od = EMPTY_OPTOMETRY_DATA
if uncorrected_vision_os in ('', None):
uncorrected_vision_os = EMPTY_OPTOMETRY_DATA
if corrected_vision_od in ('', None):
corrected_vision_od = EMPTY_OPTOMETRY_DATA
if corrected_vision_os in ('', None):
corrected_vision_os = EMPTY_OPTOMETRY_DATA
if spherical_diopter_od in ('', None):
spherical_diopter_od = EMPTY_OPTOMETRY_DATA
if spherical_diopter_os in ('', None):
spherical_diopter_os = EMPTY_OPTOMETRY_DATA
if orthokeratology_diopter_od in ('', None):
orthokeratology_diopter_od = EMPTY_OPTOMETRY_DATA
if orthokeratology_diopter_os in ('', None):
orthokeratology_diopter_os = EMPTY_OPTOMETRY_DATA
if self.is_chinese_system and self.actionActivate_School_Mode.isChecked():
patient_info.update({'ID': patient_id, 'Name': last_name, 'Gender': gender,
'Birthday': [birth_date.year(), birth_date.month(), birth_date.day()],
'Grade': grade_name, 'Class': class_name})
# print(patient_info)
# print(self.current_student_info)
if patient_info != self.current_student_info:
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr('Do not change any information of this student!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
if not self.grpc_stub:
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr('MSI Study has not connected to instrument!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
try:
# 病人信息发送
patient_request = Orchestrator_pb2.InfoRequest()
patient_request.ID = patient_id
patient_request.FirstName = first_name
patient_request.MiddleName = middle_name
patient_request.LastName = last_name
patient_request.Gender = gender
patient_request.BirthDate.Year = birth_date.year()
patient_request.BirthDate.Month = birth_date.month()
patient_request.BirthDate.Day = birth_date.day()
patient_request.AutoNext = self.actionAuto_Next.isChecked()
patient_request.Address = ''
patient_request.Department = ''
self.logger.info(
f'Send {patient_id} {last_name} {gender} {birth_date.toString("yyyy/MM/dd")} to instrument')
response = self.grpc_stub.PatientInfo(patient_request)
if not response.Success:
self.logger.critical(f'Failed to send {patient_id} {last_name} to instrument, '
f'response:{response.Success}')
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr('Set patient information failed! '
'Please check the connection status and make sure '
'current page is patient information page on device!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
else:
self.logger.info('Orchestrator response true')
# 清空查询的病人信息
self.reset_patient_info()
if not self.actionActivate_School_Mode.isChecked(): # 校筛时不清空
self.patients_to_show.clear()
self.fill_table_with_patients_in_orthanc()
except grpc.RpcError:
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr('No valid gRPC connection with instrument! Reasons below:\n'
'1.RJ45 lost connection.\n'
'2.The instrument does not support new patient remotely.'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
except Exception as e:
self.logger.critical('new patient exception ' + str(e))
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr('Unexpected error:\n%s') % e,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
if not self.is_chinese_system: # 非中文系统不支持校筛
self.reset_optometry_widgets()
return
try:
# 校园筛查信息发送
school_examination_request = Orchestrator_pb2.SchoolExaminationInfoRequest()
if self.actionActivate_School_Mode.isChecked():
school_examination_request.StudentAddress.Class = str(class_number).strip()
school_examination_request.StudentAddress.Grade = str(grade).strip()
school_examination_request.StudentAddress.School = str(school).strip()
school_examination_request.StudentAddress.County = str(county).strip()
school_examination_request.StudentAddress.City = str(city).strip()
school_examination_request.StudentAddress.Province = str(province).strip()
school_examination_request.StudentAddress.ExaminationAddress = examination_address
school_examination_request.StudentAddress.RecordId = str(record_id).strip()
else:
school_examination_request.StudentAddress.Class = ''
school_examination_request.StudentAddress.Grade = ''
school_examination_request.StudentAddress.School = ''
school_examination_request.StudentAddress.County = ''
school_examination_request.StudentAddress.City = ''
school_examination_request.StudentAddress.Province = ''
school_examination_request.StudentAddress.ExaminationAddress = ''
school_examination_request.StudentAddress.RecordId = ''
for eye in ('OD', 'OS'):
if eye == 'OD':
values = (spherical_diopter_od, astigmatism_degree_od, astigmatism_axis_od,
cylindrical_diopter_od, cylinder_axis_od, pupillary_distance,
pupil_size_od, steepest_radius_of_curvature_od, steepest_degree_of_curvature_od,
steepest_direction_of_curvature_od, flatest_radius_of_curvature_od,
flatest_degree_of_curvature_od, flatest_direction_of_curvature_od,
average_radius_of_curvature_od, average_degree_of_curvature_od, glasses_type,
uncorrected_vision_od, corrected_vision_od, equivalent_spherical_diopter_od,
orthokeratology_diopter_od)
else:
values = (spherical_diopter_os, astigmatism_degree_os, astigmatism_axis_os,
cylindrical_diopter_os, cylinder_axis_os, pupillary_distance,
pupil_size_os, steepest_radius_of_curvature_os, steepest_degree_of_curvature_os,
steepest_direction_of_curvature_os, flatest_radius_of_curvature_os,
flatest_degree_of_curvature_os, flatest_direction_of_curvature_os,
average_radius_of_curvature_os, average_degree_of_curvature_os, glasses_type,
uncorrected_vision_os, corrected_vision_os, equivalent_spherical_diopter_os,
orthokeratology_diopter_os)
for value in values:
school_examination_request.OptometryItems.add(Value=value, Eye=eye)
response = self.grpc_stub.SchoolExaminationInfo(school_examination_request)
if not response.Received:
self.logger.critical(f'Failed to send school and optometry info '
f'of {patient_id} {last_name} to instrument')
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr(f'Failed to send school and optometry info '
f'to instrument, pleas retry!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
else:
self.reset_optometry_widgets()
# 记录该学生到数据库 ****取消此功能****
# student_info = (patient_id, last_name, gender, birth_date.toString('yyyy/MM/dd'),
# province_name, city_name, county_name,
# school_name, grade_name, class_name, 0)
# if student_info not in self.locale_students_in_database:
# self.locale_students_in_database.append(student_info[:-1])
# self.update_database_thread.school = school_name
# self.update_database_thread.student_info = student_info
# self.update_database_thread.start()
except grpc.RpcError:
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr('No school mode in the device!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
except Exception as e:
self.logger.critical('new optometry exception ' + str(e))
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr('Unexpected error:\n%s') % e,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
self.lineEdit_ID.setFocus() # 发送完毕后ID输入框自动激活
def grab_image(self):
if self.camera and self.camera.isOpened():
self.grab_flag = True
self.sleeper.start_stamp = QtCore.QDateTime.currentDateTime() # 重置休眠器计时
self.pushButton_Grab.setEnabled(False)
QtCore.QTimer.singleShot(1500, lambda: self.pushButton_Grab.setEnabled(True))
self.logger.info('grab')
else:
QtWidgets.QMessageBox.information(self, self.tr('Information'),
self.tr('Camera is not opened!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
def read_info(self):
try:
if self.image_clip is not None:
for lb in self.mark_boxes:
x, y, width, height = lb.geometry().getRect()
x -= self.label_View.draw_pos.x()
y -= self.label_View.draw_pos.y()
# box = (x, y, x + width, y + height)
ocr_area = self.image_clip[int(y):int(y + height), int(x):int(x + width)]
ocr_area = PIL.Image.fromarray(ocr_area)
# 对比度增强
enhancer = PIL.ImageEnhance.Contrast(ocr_area)
ocr_area = enhancer.enhance(self.config['Image']['Contrast'])
ocr_area = ocr_area.convert('L') # 灰度化
lang = self.config.get('OCRLanguage', 'chi_sim')
if lb.text() == self.tr('ID'):
lang = 'eng'
thread = tasks.OCR(self, lb.text(), ocr_area, lang)
thread.result.connect(self.ocr_result)
thread.finished.connect(thread.deleteLater)
thread.start()
except Exception as e:
self.logger.critical('ocr read exception ' + str(e))
def ocr_result(self, box_text, info):
self.logger.info(f'Get info of {box_text}, {info}')
try:
if not self.config['Instrument']['AutoID'] and box_text == self.tr('ID'):
self.lineEdit_ID.setText(info)
if self.comboBox_Scene.currentText() == '身份证' and len(info) == 18:
birth_date = QtCore.QDate(int(info[6:10]), int(info[10:12]), int(info[12:14]))
self.dateEdit.setDate(birth_date)
elif box_text == self.tr('FirstName'):
self.lineEdit_First_Name.setText(info)
elif box_text == self.tr('MiddleName'):
self.lineEdit_Middle_Name.setText(info)
elif box_text == self.tr('LastName'):
self.lineEdit_Last_Name.setText(info)
elif box_text == self.tr('BirthDate'):
if not self.comboBox_Scene.currentText() == '身份证':
date_format = self.comboBox_Date_Format.currentText()
tmp = []
if date_format == 'yyyy-MM-dd':
tmp = info.split('-')
elif date_format == 'yyyy.MM.dd':
tmp = info.split('.')
elif date_format == 'yyyy/MM/dd':
tmp = info.split('/')
if len(tmp) == 3:
year, month, day = int(tmp[0]), int(tmp[1]), int(tmp[2])
birth_date = QtCore.QDate(year, month, day)
self.dateEdit.setDate(birth_date)
elif box_text == self.tr('Age'):
self.lineEdit_Age.setText(info)
elif box_text == self.tr('Gender'):
if info == self.tr('Male'):
self.radioButton_Male.setChecked(True)
elif info == self.tr('Female'):
self.radioButton_Female.setChecked(True)
else:
self.radioButton_Unkown.setChecked(True)
elif box_text == self.tr('Address'):
# self.lineEdit_Address.setText(info)
pass
elif box_text == self.tr('Department'):
# self.lineEdit_Department.setText(info)
pass
except Exception as e:
self.logger.critical('ocr result exception ' + str(e))
def closeEvent(self, evt):
if self.full_screen_flag and self.forbidden_key_triggered: # 全屏模式禁用alt f4关闭
self.forbidden_key_triggered = False
evt.ignore()
return
if self.current_tasks > 0:
QtWidgets.QMessageBox.information(self, self.tr('Confirm'),
self.tr('There are tasks running, '
'make sure they are finished before shutting down Windows'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
self.release_camera()
self.save_school_setting()
self.save_as_default()
self.save_third_setting()
# if self.task_thread.isRunning():
# self.task_thread.terminate()
# self.task_thread.wait(200)
# if self.update_database_thread.isRunning():
# self.update_database_thread.terminate()
# self.update_database_thread.wait(200)
if self.token_thread.isRunning():
self.token_thread.terminate()
self.token_thread.wait(200)
if self.grpc_connection_thread and self.grpc_connection_thread.isRunning():
self.grpc_connection_thread.terminate()
self.grpc_connection_thread.wait(200)
self.logger.info('Close App')
shutdown_log()
# evt.accept() #关闭功能仅实现隐藏窗口
# def show_at_desktop_center(self):
# screen_rect: QtCore.QRect = QtWidgets.QApplication.desktop().screenGeometry()
# app_rect: QtCore.QRect = self.geometry()
# app_rect.moveCenter(screen_rect.center())
# self.move(app_rect.topLeft())
# self.centered = True
def center_ui(self):
screen_rect: QtCore.QRect = QtWidgets.QDesktopWidget().screenGeometry()
if self.label_View.pix.width() < screen_rect.width() and \
self.label_View.pix.height() + self.dockWidget.height() + 50 < screen_rect.height():
self.resize(self.label_View.pix.width(),
self.label_View.pix.height() + self.dockWidget.height() + 50)
else:
self.resize(1280, 960)
app_rect: QtCore.QRect = self.geometry()
app_rect.moveCenter(screen_rect.center())
self.move(app_rect.topLeft())
self.centered = True
def show_notice_box(self):
if self.camera and self.camera.isOpened():
scene_config = f'scenes/{self.comboBox_Scene.currentText()}.json'
if os.path.exists(scene_config):
try:
with open(scene_config, 'r') as f:
boxes = json.load(f)
self.label_View.scale = boxes['Scale']
self.need_reset_pix_pos = True
self.clear_boxes()
for box_name in boxes['Positions']:
lb = IndicatorBox(self.label_View, box_name, geometry=boxes['Positions'][box_name],
stylesheet='border-width: 4px;'
'border-style: solid;'
'border-color: #9AFF9A;'
'border-radius:10px;'
'font-size:25px;'
'color:SeaGreen')
lb.show()
except Exception as e:
self.logger.critical('show notice box exception ' + str(e))
def show_rectangle(self, start, end):
x1, y1, x2, y2 = start.x(), start.y(), end.x(), end.y()
self.current_box_geometry = [min(x1, x2), min(y1, y2), abs(x1 - x2), abs(y1 - y2)]
self.lineEdit_Geometry.setText(self.tr('Start Point: %s, %s; Width: %s; Height: %s')
% (str(min(x1, x2)), str(min(y1, y2)),
str(abs(x1 - x2)), str(abs(y1 - y2))))
def save_new_scene(self):
if not self.mark_boxes:
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr('No box settled'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
name, ok = QtWidgets.QInputDialog.getText(self, self.tr('New Scene'),
self.tr('Name:'),
QtWidgets.QLineEdit.Normal)
if not ok:
return
configs = []
for filename in os.listdir('scenes'):
configs.append(filename.rstrip('.json'))
scene = {'Scale': round(self.label_View.scale, 2), 'Positions': {}}
for lb in self.mark_boxes:
scene['Positions'][lb.text()] = lb.geometry().getRect()
with open(f'scenes/{name}.json', 'w') as f:
json.dump(scene, f, indent=4, ensure_ascii=True)
if name not in configs:
self.comboBox_Scene.addItem(name)
self.comboBox_Scene.setCurrentText(name)
QtWidgets.QMessageBox.information(self, self.tr('Information'),
self.tr('New scene: %s saved') % name,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
@property
def mark_boxes(self):
boxes = []
for lb in self.label_View.children():
if isinstance(lb, IndicatorBox):
boxes.append(lb)
return boxes
def add_box(self, box_name):
for lb in self.mark_boxes:
if lb.text() == box_name:
return
lb = IndicatorBox(self.label_View, box_name, geometry=self.current_box_geometry,
stylesheet='border-width: 4px;'
'border-style: solid;'
'border-color: #9AFF9A;'
'border-radius:10px;'
'font-size:25px;'
'color:SeaGreen')
lb.show()
proper_pos = QtGui.QCursor.pos() - self.pos()
lb.move(proper_pos.x() - 10, proper_pos.y() - self.dockWidget.height() - 50)
def new_scene(self):
self.lineEdit_Scene_Name.clear()
self.comboBox_Box.setCurrentIndex(0)
self.lineEdit_Geometry.clear()
self.clear_boxes()
def clear_boxes(self):
for lb in self.mark_boxes:
lb.deleteLater()
def resizeEvent(self, *args, **kwargs):
# 图像居中显示
if hasattr(self, 'label_View') and hasattr(self.label_View, 'pix'):
pix_draw_area_size = (self.label_View.size() - self.label_View.pix.size()) / 2
self.label_View.draw_pos = QtCore.QPointF(pix_draw_area_size.width(),
pix_draw_area_size.height())
self.need_reset_pix_pos = False
def pix_resized(self):
self.need_reset_pix_pos = True
def auto_id(self):
if self.actionAuto_ID.isChecked() and not self.is_patient_selecting \
and not self.actionActivate_School_Mode.isChecked(): # 避免填充选中病人时进行auto id, 校园筛查不用auto id
# last_name = self.lineEdit_Last_Name.text().strip()
# middle_name = self.lineEdit_Middle_Name.text().strip()
# first_name = self.lineEdit_First_Name.text().strip()
# name = first_name + ' ' + middle_name + ' ' + last_name
# if self.radioButton_Male.isChecked():
# gender = self.radioButton_Male.text()
# elif self.radioButton_Female.isChecked():
# gender = self.radioButton_Female.text()
# else:
# gender = self.radioButton_Unkown.text()
# birth_date = self.dateEdit.date().toString("yyyy-MM-dd")
# info = name + birth_date + gender
# self.lineEdit_ID.setText(encrypt(info).upper())
# 按照产品需求变更自动ID与设备一致
self.lineEdit_ID.setText(QtCore.QDateTime.currentDateTime().toString('yyyyMMddHHmmsszzz'))
def select_patient_to_fill(self, selected_row=None):
if self.update_optometry_mode: # 更新验光数据时不做处理
return
self.is_patient_selecting = True
if isinstance(selected_row, int) and selected_row >= 0:
row = selected_row
else:
row = self.table_view.currentRow()
column_num = self.table_view.columnCount()
# self.current_student_info['Province'] = self.patients_to_show[row].get('province_name', '')
# self.current_student_info['City'] = self.patients_to_show[row].get('city_name', '')
# self.current_student_info['County'] = self.patients_to_show[row].get('area_name', '')
if row >= 0:
try:
for col in range(column_num):
txt = self.table_view.item(row, col).text()
if col == 0:
with QtCore.QSignalBlocker(self.lineEdit_Last_Name): # 填充时屏蔽信号
self.lineEdit_Last_Name.setText(txt) # name
elif col == 1:
with QtCore.QSignalBlocker(self.lineEdit_ID): # 填充时屏蔽信号
self.lineEdit_ID.setText(txt) # ID
elif col == 2:
gender = txt
if gender == self.tr('Male'):
self.radioButton_Male.setChecked(True)
elif gender == self.tr('Female'):
self.radioButton_Female.setChecked(True)
else:
self.radioButton_Unkown.setChecked(True)
elif col == 3:
txt = txt.replace('.', '/').replace('-', '/')
date_info = txt.split('/')
if len(date_info) == 3:
year, month, day = date_info
birth_date = QtCore.QDate(int(year), int(month), int(day))
self.dateEdit.setDate(birth_date)
elif col == 4 and self.table_view.horizontalHeaderItem(col).text() == self.tr('Grade'):
self.comboBox_Grade.setCurrentIndex(self.current_grades.index(txt))
self.current_classes = self.school_parser.get_classes(self.comboBox_Grade.currentText())
self.comboBox_Class.clear()
self.comboBox_Class.addItems(self.current_classes)
elif col == 5 and self.table_view.horizontalHeaderItem(col).text() == self.tr('Class'):
self.comboBox_Class.setCurrentIndex(self.current_classes.index(txt))
if self.actionActivate_School_Mode.isChecked() and 'id_card' in self.patients_to_show[row]:
self.current_student_info['ID'] = self.patients_to_show[row].get('id_card').strip()
self.current_student_info['Name'] = self.table_view.item(row, 0).text().strip()
self.current_student_info['Gender'] = self.table_view.item(row, 2).text().strip()
birthday = self.table_view.item(row, 3).text().replace('.', '/').replace('-', '/')
date_info = birthday.split('/')
if len(date_info) == 3:
year, month, day = date_info
self.current_student_info['Birthday'] = [int(year), int(month), int(day)]
self.current_student_info['Grade'] = self.table_view.item(row, 4).text().strip()
self.current_student_info['Class'] = self.table_view.item(row, 5).text().strip()
except Exception as e:
# print('select_patient_to_fill', e)
self.logger.critical('fill selected patient exception: ' + str(e))
self.is_patient_selecting = False
def update_patients_in_orthanc(self, patients):
# 匹配已有数据进行更新查询的数据库缓存
for p in patients:
if isinstance(p, dict):
p_id = p.get('ID', '')
existed = False
for pp in self.patients_in_orthanc:
if p_id == pp.get('ID', None):
existed = True
pp['Studies'] = p.get('Studies', [])
pp['MainDicomTags'] = p.get('MainDicomTags', {})
break
if not existed:
self.patients_in_orthanc.append(p)
# self.patients_to_show = self.patients_in_orthanc
# if len(self.patients_to_show) > self.config['orthanc']['MaxShowNumber']:
# self.patients_to_show = self.patients_in_orthanc[:self.config['orthanc']['MaxShowNumber']]
# self.fill_patients_table()
def update_students_on_cloud(self, patients):
# print(patients)
self.patients_on_cloud = patients
self.school_parser.school_data = self.patients_on_cloud
if len(self.patients_on_cloud) == 0:
school_name = self.lineEdit_School_Name.text().strip()
QtWidgets.QMessageBox.warning(self, self.tr('Warning'),
self.tr('No students queried from school: %s') %
'<font color=SpringGreen>%s</font>' % school_name,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
self.init_cloud_students_to_database() # 加入数据库记录
QtWidgets.QMessageBox.information(self, self.tr('Information'),
self.tr('Students have been queried'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
try:
self.comboBox_Grade.clear()
self.current_grades = self.school_parser.get_grades()
self.comboBox_Grade.addItems(self.current_grades)
province_id = self.school_parser.get_province_id()
if not province_id:
self.logger.warning(f'No province id in {patients[0]}')
return
province_name = self.nation_location.get_province_name_by_id(province_id)
if not province_name:
self.logger.warning(f'Can not get province name by id: {province_id}')
return
provinces = self.nation_location.get_province_names()
self.comboBox_Province.setCurrentIndex(provinces.index(province_name))
if province_name not in location_parse.SPECIAL_CITIES_OR_PROVINCES:
city_id = self.school_parser.get_city_id()
if not city_id:
self.logger.warning(f'No city id in {patients[0]}')
return
city_name = self.nation_location.get_city_name_by_id(province_name, city_id)
if not city_name:
self.logger.warning(f'Can not get city name of {province_name} by id: {city_id}')
return
cities = self.nation_location.get_city_names(province_name)
if not cities:
self.logger.warning(f'Can not get cities by province name: {province_name}')
return
self.comboBox_City.setCurrentIndex(cities.index(city_name))
else:
city_name = province_name
self.comboBox_City.clear()
self.comboBox_City.addItems(location_parse.SPECIAL_CITIES_OR_PROVINCES)
self.comboBox_City.setCurrentIndex(location_parse.SPECIAL_CITIES_OR_PROVINCES.index(city_name))
county_id = self.school_parser.get_county_id()
if not county_id:
self.logger.warning(f'No county id in {patients[0]}')
return
county_name = self.nation_location.get_county_name_by_id(province_name, city_name, county_id)
if not county_name:
self.logger.warning(f'Can not get county name of {province_name} {city_name} by id: {county_id}')
return
counties = self.nation_location.get_county_names(province_name, city_name)
if not counties:
self.logger.warning(f'Can not get counties of {province_name} {city_name} by city name: {city_name}')
return
self.comboBox_County.setCurrentIndex(counties.index(county_name))
except Exception as e:
self.logger.critical('update students exception ' + str(e))
def clear_cloud_queried(self):
self.patients_on_cloud.clear()
self.school_parser.school_data = []
self.comboBox_Grade.clear()
self.current_grades.clear()
def query_patients_failed(self, error):
self.sender().terminate()
self.sender().wait()
QtWidgets.QMessageBox.critical(self, self.tr('Error'), error,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
def get_local_students_data(self):
# 获取本地数据库内的校筛学生信息
school = self.lineEdit_School_Name.text().strip()
if school:
database_query_thread = tasks.DatabaseOperation(self, DATABASE, school=school, query_database=True)
database_query_thread.finished.connect(database_query_thread.deleteLater)
database_query_thread.error.connect(
lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok,
QtWidgets.QMessageBox.Ok))
database_query_thread.start()
def query_unchecked_students_on_cloud(self):
school = self.lineEdit_School_Name.text().strip()
if school:
self.current_school = school
notice = Notice(self, '<font color=SpringGreen>%s</font>'
% self.tr('Querying students, please wait...'))
notice.show()
# 使用code is land的云端
if self.action_enable_codeisland.isChecked():
url = self.third_config['CodeIsLand']['Domain'] + '/' + self.third_config['CodeIsLand']['QueryAPI']
students_query_thread = \
tasks.GetStudentsFromCodeIsLand(self,
url=url,
school=school,
app_id=self.third_config['CodeIsLand']['AppID'],
app_secret=self.third_config['CodeIsLand']['AppSecret'],
failed_msg=self.tr('Can not get students information from %s!')
% '<font color=SpringGreen>%s</font>' % school,
timeout=self.third_config['CodeIsLand']['QueryTimeout'])
students_query_thread.got_students.connect(self.update_students_on_cloud)
students_query_thread.query_failed.connect(self.query_patients_failed)
students_query_thread.finished.connect(students_query_thread.deleteLater)
students_query_thread.finished.connect(notice.deleteLater)
students_query_thread.start()
if self.action_enable_thondar.isChecked():
if self.cloud_token:
# 云平台查询未检查学生信息
students_query_thread = \
tasks.PostOrGetStudentsOnCloud(self, self.config['SchoolMode']['QueryIPAddress'],
self.config['SchoolMode']['QueryPort'],
data={'schoolName': school, 'tokenCode': self.cloud_token},
interface='/Home/GetScreeningInfoBySchoolName',
failed_msg=self.tr('Can not get students information from %s!')
% '<font color=SpringGreen>%s</font>' % school,
timeout=self.config['SchoolMode']['QueryTimeout'])
students_query_thread.got_students.connect(self.update_students_on_cloud)
students_query_thread.query_failed.connect(self.query_patients_failed)
students_query_thread.finished.connect(students_query_thread.deleteLater)
students_query_thread.finished.connect(notice.deleteLater)
students_query_thread.start()
else:
QtWidgets.QMessageBox.warning(self, self.tr('Warning'),
self.tr('Cloud Token is not available, please try later!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
else:
QtWidgets.QMessageBox.warning(self, self.tr('Warning'),
self.tr('Please input school name first!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
def query_optometry_students_on_cloud(self):
school = self.lineEdit_School_Name.text().strip()
if school:
self.current_school = school
if self.cloud_token:
# 云平台查询待更新验光数据学生信息
query_thread = \
tasks.PostOrGetStudentsOnCloud(self, self.config['SchoolMode']['QueryIPAddress'],
self.config['SchoolMode']['QueryPort'],
data={'schoolName': school, 'tokenCode': self.cloud_token},
interface='/Home/GetAlreadyScreenBySchoolName',
failed_msg=self.tr('Can not get students information from %s!')
% '<font color=SpringGreen>%s</font>' % school,
timeout=self.config['SchoolMode']['QueryTimeout'])
query_thread.got_students.connect(self.update_students_on_cloud)
query_thread.query_failed.connect(self.query_patients_failed)
query_thread.finished.connect(query_thread.deleteLater)
query_thread.start()
else:
QtWidgets.QMessageBox.warning(self, self.tr('Warning'),
self.tr('Cloud Token is not available, please try later!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
else:
QtWidgets.QMessageBox.warning(self, self.tr('Warning'),
self.tr('Please input school name first!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
@staticmethod
def optometry_has_checked_valid(student_info: dict):
for eye in student_info.get('eyeList', []):
for key in eye:
if key in ('exam_study_eye_id', 'eye_type') or \
(key in ('spherical_diopter', 'ok_glass_degree', 'naked_vision', 'wear_glass_vision',
'equivalent_spherical_diopter') and (str(eye[key]) == EMPTY_OPTOMETRY_DATA)):
continue
if eye[key] != '':
return True
return False
def student_has_stored(self, student_info: dict): # note 数据库变更, 筛查时暂时隐藏,该功能不用
try:
name = student_info.get('name', '')
gender = student_info.get('gender', '')
birthday = student_info.get('birthday', '')
student_id = student_info.get('id_card', '')
province = student_info.get('province_name', '')
city = student_info.get('city_name', '')
county = student_info.get('area_name', '')
school = student_info.get('school_name', '')
grade = student_info.get('grade_name', '')
class_name = student_info.get('class_name', '')
if (student_id, name, gender, birthday, province, city, county, school, grade, class_name) in \
self.locale_students_in_database:
return True
return False
except Exception as e:
QtWidgets.QMessageBox.critical(self, self.tr('Error'), str(e),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
def fill_table_with_optometry_students(self):
row_num = len(self.patients_to_show)
self.table_view.setRowCount(row_num)
if row_num:
head_labels = (self.tr('Optometry Status'), self.tr('Name'), self.tr('ID'), self.tr('Gender'),
self.tr('Birth Date'), self.tr('Grade'), self.tr('Class'))
self.table_view.setColumnCount(len(head_labels))
bold_font = QtGui.QFont()
bold_font.setBold(True)
for col, head_txt in enumerate(head_labels):
head_item = QtWidgets.QTableWidgetItem(head_txt)
head_item.setTextAlignment(QtCore.Qt.AlignCenter)
head_item.setFont(bold_font)
self.table_view.setHorizontalHeaderItem(col, head_item)
else:
self.table_view.setColumnCount(0) # 清空表格
table_columns = self.table_view.columnCount()
attributes = ('name', 'id_card', 'gender', 'birthday', 'grade_name', 'class_name')
for row in range(row_num):
for col in range(table_columns):
if col == 0:
if not self.optometry_has_checked_valid(self.patients_to_show[row]):
btn = qt.newButton(self.tr('Update'), icon='update', slot=self.open_optometry_dialog)
else:
btn = qt.newButton(self.tr('Updated'), icon='checked', slot=self.open_optometry_dialog)
self.table_view.setCellWidget(row, col, btn)
else:
txt = self.patients_to_show[row][attributes[col - 1]]
if col == 2 and qt.isChineseCitizenID(txt):
txt = txt[:3] + '*' * 12 + txt[-3:]
cell_item = QtWidgets.QTableWidgetItem(txt)
cell_item.setTextAlignment(QtCore.Qt.AlignCenter)
self.table_view.setItem(row, col, cell_item)
def fill_table_with_unchecked_students(self):
row_num = len(self.patients_to_show)
self.table_view.setRowCount(row_num)
if row_num:
head_labels = (self.tr('Name'), self.tr('ID'), self.tr('Gender'),
self.tr('Birth Date'), self.tr('Grade'), self.tr('Class'))
self.table_view.setColumnCount(len(head_labels))
bold_font = QtGui.QFont()
bold_font.setBold(True)
for col, head_txt in enumerate(head_labels):
head_item = QtWidgets.QTableWidgetItem(head_txt)
head_item.setTextAlignment(QtCore.Qt.AlignCenter)
head_item.setFont(bold_font)
self.table_view.setHorizontalHeaderItem(col, head_item)
else:
self.table_view.setColumnCount(0) # 清空表格
table_columns = self.table_view.columnCount()
attributes = ('name', 'id_card', 'gender', 'birthday', 'grade_name', 'class_name')
for row in range(row_num):
for col in range(table_columns):
txt = self.patients_to_show[row][attributes[col]]
if col == 1 and qt.isChineseCitizenID(txt):
txt = txt[:3] + '*' * 12 + txt[-3:]
cell_item = QtWidgets.QTableWidgetItem(txt)
cell_item.setTextAlignment(QtCore.Qt.AlignCenter)
self.table_view.setItem(row, col, cell_item)
def fill_table_with_patients_in_orthanc(self):
row_num = len(self.patients_to_show)
if row_num:
head_labels = (self.tr('Name'), self.tr('ID'), self.tr('Gender'),
self.tr('Birth Date'), self.tr('Study Times'))
self.table_view.setColumnCount(len(head_labels))
bold_font = QtGui.QFont()
bold_font.setBold(True)
for col, head_txt in enumerate(head_labels):
head_item = QtWidgets.QTableWidgetItem(head_txt)
head_item.setTextAlignment(QtCore.Qt.AlignCenter)
head_item.setFont(bold_font)
self.table_view.setHorizontalHeaderItem(col, head_item)
else:
self.table_view.setColumnCount(0) # 清空表格
self.table_view.setRowCount(0) # 清空表格
# self.table_view.clear()
self.table_view.setRowCount(row_num)
# 填充单元格
table_columns = self.table_view.columnCount()
attributes = ('PatientName', 'PatientID', 'PatientSex', 'PatientBirthDate')
for row in range(row_num):
for col in range(table_columns):
if col == 4:
cell_item = QtWidgets.QTableWidgetItem(str(len(self.patients_to_show[row]['Studies'])))
elif col == 2:
gender = self.patients_to_show[row]['MainDicomTags'][attributes[col]]
if gender == 'M':
gender = self.tr('Male')
elif gender == 'F':
gender = self.tr('Female')
else:
gender = self.tr('Unknown')
cell_item = QtWidgets.QTableWidgetItem(gender)
elif col == 3:
birth_date = self.patients_to_show[row]['MainDicomTags'][attributes[col]]
if len(birth_date) == 8:
year = birth_date[:4]
month = birth_date[4:6]
day = birth_date[6:]
birth_date = year + '/' + month + '/' + day
else:
self.logger.critical(f"Patient {self.patients_to_show[row]['MainDicomTags'][attributes[1]]} "
f" {self.patients_to_show[row]['MainDicomTags'][attributes[0]]} "
f"birthdate incorrect!")
cell_item = QtWidgets.QTableWidgetItem(birth_date)
else:
cell_item = QtWidgets.QTableWidgetItem(self.patients_to_show[row]['MainDicomTags'][attributes[col]])
cell_item.setTextAlignment(QtCore.Qt.AlignCenter)
self.table_view.setItem(row, col, cell_item)
def search_students_on_cloud(self):
sender = self.sender()
if sender == self.cloud_search_trigger:
sender.stop()
if self.is_patient_selecting or not self.actionActivate_School_Mode.isChecked(): # 非校园模式不启用此功能
return
self.patients_to_show.clear()
students = []
# self.reset_patient_info()
if sender == self.comboBox_Grade:
self.comboBox_Class.clear()
self.current_classes = self.school_parser.get_classes(self.comboBox_Grade.currentText())
self.comboBox_Class.addItems(self.current_classes)
students = self.school_parser.get_students(self.comboBox_Grade.currentText(),
self.comboBox_Class.currentText())
elif sender in (self.comboBox_Class, self.radioButton_Examined,
self.radioButton_Not_Examined): # self.update_database_thread)
students = self.school_parser.get_students(self.comboBox_Grade.currentText(),
self.comboBox_Class.currentText())
elif sender in (self.lineEdit_ID, self.lineEdit_Last_Name, self.cloud_search_trigger):
id_txt = self.lineEdit_ID.text().strip().lower()
if not id_txt: # 可控制当reset patient info导致的line edit id重复触发显示workflow dialog
return
# 兼容东阳校筛的二维码扫描,其二维码为ID姓名等等的组合,以,分隔
if ',' in id_txt:
id_txt = id_txt.split(',')[0]
name_txt = self.lineEdit_Last_Name.text().strip().lower()
if id_txt and not name_txt:
for patient in self.patients_on_cloud:
if id_txt in patient['id_card'].lower():
students.append(patient)
elif not id_txt and name_txt:
for patient in self.patients_on_cloud:
if name_txt in patient['name'].lower():
students.append(patient)
elif id_txt and name_txt:
for patient in self.patients_on_cloud:
if name_txt in patient['name'].lower() and id_txt in patient['id_card'].lower():
students.append(patient)
elif not id_txt and not name_txt:
students = self.school_parser.get_students(self.comboBox_Grade.currentText(),
self.comboBox_Class.currentText())
if self.update_optometry_mode: # 对于不同的模式,表格填充不同数据
# print(students)
for student in students:
# 根据是否已更新进行分类
# if self.optometry_has_checked_valid(student) is self.radioButton_Examined.isChecked() \
# and student not in self.patients_to_show:
if student not in self.patients_to_show:
self.patients_to_show.append(student)
self.fill_table_with_optometry_students()
if sender == self.lineEdit_ID and len(self.patients_to_show) == 1:
self.optometry_dialog.student = self.patients_to_show[0]
student_id = self.patients_to_show[0].get('id_card')
if student_id == (txt := self.lineEdit_ID.text().strip()):
self.optometry_dialog.fill_student_info()
self.optometry_dialog.initialize_table()
self.optometry_dialog.update_background_thread.stop = True # 开启设置界面时停止后台自动更新线程
self.optometry_dialog.show()
if qt.isChineseCitizenID(student_id): # 不显示身份证
self.lineEdit_ID.setText(student_id[:3] + '*' * 12 + student_id[-3:])
else:
if txt:
self.statusbar.showMessage(self.tr('Can not find student ID: %s') % student_id, 5000)
else:
# 筛查状态
for student in students:
# 根据是否已拍摄进行分类 # note 筛查时暂时去除分类勾选
# if self.student_has_stored(student) is self.radioButton_Examined.isChecked() \
# and student not in self.patients_to_show:
if student not in self.patients_to_show:
self.patients_to_show.append(student)
self.fill_table_with_unchecked_students()
if sender == self.cloud_search_trigger and len(self.patients_to_show) == 1 and len(students) >= 1:
self.select_patient_to_fill(selected_row=0)
self.work_flow_dialog.student = self.patients_to_show[0]
student_id = self.patients_to_show[0].get('id_card')
student_name = self.patients_to_show[0].get('name')
if student_id == (txt := self.current_student_info['ID']):
self.logger.info(f'Find student {student_name}')
self.work_flow_dialog.province = self.comboBox_Province.currentText()
self.work_flow_dialog.city = self.comboBox_City.currentText()
self.work_flow_dialog.county = self.comboBox_County.currentText()
self.work_flow_dialog.school = self.current_school
self.work_flow_dialog.fill_student_info()
self.work_flow_dialog.initialize_table()
self.work_flow_dialog.show()
self.work_flow_dialog.tableWidget.setFocus()
if qt.isChineseCitizenID(student_id): # 不显示身份证
self.lineEdit_ID.setText(student_id[:3] + '*' * 12 + student_id[-3:])
else:
if txt:
self.logger.warning(f'Can not find student {student_id} {student_name}')
self.statusbar.showMessage(self.tr('Can not find student ID: %s') % student_id, 5000)
if sender == self.lineEdit_ID and len(self.patients_to_show) == 0:
self.statusbar.showMessage(self.tr('Can not find person by ID: %s') % self.lineEdit_ID.text(), 5000)
def search_patients_in_orthanc(self):
sender = self.sender()
if sender == self.orthanc_search_trigger:
sender.stop()
if self.is_patient_selecting or self.actionActivate_School_Mode.isChecked(): # 控制填充line edit时的逻辑,避免报错
return
# 添加天津眼科医院的查询逻辑
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
if self.action_enable_tianjin_eye_hospital.isChecked():
id_txt = self.lineEdit_ID.text().strip()
if not id_txt:
# QtWidgets.QMessageBox.warning(self, self.tr('Warning'), self.tr('Invalid query id'),
# QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
tip = Notice(self, '<font color=SpringGreen>%s</font>' % self.tr('Querying information, please wait...'))
tip.show()
t = tasks.TianjinEye(self, self.logger,
self.custom_config['TianjinEyeHospital']['Domain'] + '/' + \
self.custom_config['TianjinEyeHospital']['QueryAPI'],
id_txt, timeout=self.custom_config['TianjinEyeHospital']['QueryTimeout'])
t.finished.connect(tip.close)
t.error.connect(lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.info_acquired.connect(self.fill_patient_info)
t.start()
return
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
id_txt = self.lineEdit_ID.text().strip().lower()
name_txt = self.lineEdit_Last_Name.text().strip().lower()
self.patients_to_show.clear()
if not self.actionAuto_ID.isChecked():
if id_txt and not name_txt:
for patient in self.patients_in_orthanc:
if id_txt in patient['MainDicomTags']['PatientID'].lower() and \
len(self.patients_to_show) < self.config['orthanc']['MaxShowNumber']:
self.patients_to_show.append(patient)
elif not id_txt and name_txt:
for patient in self.patients_in_orthanc:
if name_txt in patient['MainDicomTags']['PatientName'].lower() and \
len(self.patients_to_show) < self.config['orthanc']['MaxShowNumber']:
self.patients_to_show.append(patient)
elif id_txt and name_txt:
for patient in self.patients_in_orthanc:
if name_txt in patient['MainDicomTags']['PatientName'].lower() and \
id_txt in patient['MainDicomTags']['PatientID'].lower() and \
len(self.patients_to_show) < self.config['orthanc']['MaxShowNumber']:
self.patients_to_show.append(patient)
else:
for patient in self.patients_in_orthanc:
if name_txt in patient['MainDicomTags']['PatientName'].lower() and \
len(self.patients_to_show) < self.config['orthanc']['MaxShowNumber']:
self.patients_to_show.append(patient)
self.fill_table_with_patients_in_orthanc()
def change_nation_location(self):
sender = self.sender()
if sender == self.comboBox_Province:
self.comboBox_City.clear()
self.comboBox_County.clear()
province = self.comboBox_Province.currentText()
current_cities = self.nation_location.get_city_names(province)
self.comboBox_City.addItems(current_cities)
current_counties = self.nation_location.get_county_names(province, self.comboBox_City.currentText())
self.comboBox_County.addItems(current_counties)
elif sender == self.comboBox_City and self.comboBox_City.currentText() != '':
self.comboBox_County.clear()
province = self.comboBox_Province.currentText()
city = self.comboBox_City.currentText()
current_counties = self.nation_location.get_county_names(province, city)
self.comboBox_County.addItems(current_counties)
def config_cloud_address(self):
ui = InputAddress(self, self.config['SchoolMode']['QueryIPAddress'],
str(self.config['SchoolMode']['QueryPort']))
ui.address_changed.connect(self.new_cloud_address)
ui.show()
def new_cloud_address(self, address, port):
self.config['SchoolMode']['QueryIPAddress'] = address
self.config['SchoolMode']['QueryPort'] = int(port)
self.optometry_dialog.config = self.config
# self.token_thread.server_address = address
# self.token_thread.server_port = port
self.save_as_default()
def open_optometry_dialog(self):
try:
current_row = self.table_view.currentRow()
if current_row < 0:
return
student_info = self.patients_to_show[current_row]
self.optometry_dialog.student = student_info
self.optometry_dialog.fill_student_info()
self.optometry_dialog.initialize_table()
self.optometry_dialog.update_background_thread.stop = True # 开启设置界面时停止后台自动更新线程
self.optometry_dialog.show()
self.table_view.setCurrentItem(None) # 取消选中行
except Exception as e:
self.logger.critical(str(e))
def update_optometry_on_cloud(self, success, student_info):
# print(student_info)
btn = self.table_view.cellWidget(self.patients_to_show.index(student_info), 0)
if isinstance(btn, QtWidgets.QPushButton):
if success:
# name = student_info.get('name', '')
# gender = student_info.get('gender', '')
# birthday = student_info.get('birthday', '')
# student_id = student_info.get('id_card', '')
# province = student_info.get('province_name', '')
# city = student_info.get('city_name', '')
# county = student_info.get('area_name', '')
# school = student_info.get('school_name', '')
# grade = student_info.get('grade_name', '')
# class_name = student_info.get('class_name', '')
if self.optometry_has_checked_valid(student_info):
btn.setIcon(qt.newIcon('checked'))
btn.setText(self.tr('Updated'))
# self.search_students_on_cloud() # 刷新表格
# info = (student_id, name, gender, birthday, province, city, county, school, grade, class_name, 1)
# self.update_database_thread.student_info = info
else:
btn.setIcon(qt.newIcon('update'))
btn.setText(self.tr('Update'))
# info = (student_id, name, gender, birthday, province, city, county, school, grade, class_name, 0)
# self.update_database_thread.student_info = info
# self.update_database_thread.school = school
# self.update_database_thread.start()
self.logger.info(f'Updated optometry data, '
f'exam study id: {student_info.get("exam_study_id", "")}')
else:
btn.setIcon(qt.newIcon('warning'))
btn.setText(self.tr('Not updated'))
self.logger.info(
f'Update optometry data failed, exam study id: {student_info.get("exam_study_id", "")}')
# QtWidgets.QMessageBox.warning(self, self.tr('Warning'),
# self.tr('Failed to update optometry data of:\nGrade %s\nClass %s\nName %s') %
# (student_info.get("grade_name", ""), student_info.get("class_name", ""),
# student_info.get("name", "")),
# QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
def open_workflow_dialog(self):
if not self.actionActivate_School_Mode.isChecked(): # 非校园筛查不弹出此对话框
return
if self.update_optometry_mode:
return
self.logger.info(f'Open workflow ')
try:
current_row = self.table_view.currentRow()
if current_row < 0:
return
student_info = self.patients_to_show[current_row]
self.work_flow_dialog.student = student_info
self.work_flow_dialog.province = self.comboBox_Province.currentText()
self.work_flow_dialog.city = self.comboBox_City.currentText()
self.work_flow_dialog.county = self.comboBox_County.currentText()
self.work_flow_dialog.school = self.current_school
self.work_flow_dialog.fill_student_info()
self.work_flow_dialog.initialize_table()
self.work_flow_dialog.show()
self.work_flow_dialog.tableWidget.setFocus()
self.table_view.setCurrentItem(None) # 取消选中行
except Exception as e:
self.logger.critical('open_workflow_dialog: ' + str(e))
def init_cloud_students_to_database(self):
# 查询本地学生数据仅在获取到云端数据后进行
school = self.lineEdit_School_Name.text().strip()
if school and self.patients_on_cloud:
thread = tasks.DatabaseOperation(self, DATABASE, school, patients_on_cloud=self.patients_on_cloud,
init_school_students=True)
thread.finished.connect(thread.deleteLater)
thread.finished.connect(self.get_local_students_data)
thread.error.connect(lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok,
QtWidgets.QMessageBox.Ok))
thread.start()
def eye_nurse_service(self):
if not self.is_chinese_system: # 非中文系统不启用
return
if self.actionActivate_School_Mode.isChecked():
if self.action_enable_eye_nurse.isChecked():
if self.msi_device_sn and not self.eye_nurse_broadcast_thread.isRunning():
self.eye_nurse_broadcast_thread.msi_device_sn = self.msi_device_sn
self.eye_nurse_broadcast_thread.start()
if not self.eye_nurse_receive_thread.isRunning():
# self.eye_nurse_receive_thread.local_ip = self.network_ip
self.eye_nurse_receive_thread.start()
else:
if self.eye_nurse_broadcast_thread.isRunning():
self.eye_nurse_broadcast_thread.terminate()
self.eye_nurse_broadcast_thread.wait(200)
if self.eye_nurse_receive_thread.isRunning():
self.eye_nurse_receive_thread.server.close()
self.eye_nurse_receive_thread.terminate()
self.eye_nurse_receive_thread.wait(200)
else:
if self.eye_nurse_broadcast_thread.isRunning():
self.eye_nurse_broadcast_thread.terminate()
self.eye_nurse_broadcast_thread.wait(200)
if self.eye_nurse_receive_thread.isRunning():
self.eye_nurse_receive_thread.server.close()
self.eye_nurse_receive_thread.terminate()
self.eye_nurse_receive_thread.wait(200)
def change_third_instruments_states(self):
if self.sender() == self.action_enable_fr8900:
self.config['SchoolMode']['RefractometerRequired'] = \
(enable_fr8900 := self.action_enable_fr8900.isChecked())
self.label_Refractometer.setVisible(enable_fr8900)
self.label_Refractometer_Status.setVisible(enable_fr8900)
if enable_fr8900:
if self.work_flow_dialog.ser_fr8900 is None:
self.work_flow_dialog.connect_fr8900()
else:
self.work_flow_dialog.ser_fr8900 = None
if self.work_flow_dialog.refractometer_finder_thread.isRunning():
self.work_flow_dialog.refractometer_finder_thread.terminate()
self.work_flow_dialog.refractometer_finder_thread.wait(200)
self.logger.info('Stop connecting FR8900')
self.label_Refractometer.setPixmap(self.red_dot)
self.label_Refractometer_Status.setText(self.tr('%s Not Ready') % self.tr('Refractometer'))
if self.sender() == self.action_enable_vat300:
self.config['SchoolMode']['VisualChartRequired'] = \
(enable_vat300 := self.action_enable_vat300.isChecked())
self.label_Visual_Chart.setVisible(enable_vat300)
self.label_Visual_Chart_Status.setVisible(enable_vat300)
if enable_vat300:
self.work_flow_dialog.start_searching_vision_chart()
else:
if self.work_flow_dialog.vision_chart_finder_thread.isRunning():
self.work_flow_dialog.vision_chart_finder_thread.terminate()
self.work_flow_dialog.vision_chart_finder_thread.wait(200)
self.logger.info('Stop searching VAT300')
self.label_Visual_Chart.setPixmap(self.red_dot)
self.label_Visual_Chart_Status.setText(self.tr('%s Not Ready') % self.tr('Vision Chart'))
def change_student_query_cloud(self, action: QtWidgets.QAction):
if action == self.action_enable_thondar:
self.config['SchoolMode']['UseThondarCloud'] = True
self.config['SchoolMode']['UseCodeIsLand'] = False
if not self.token_thread.isRunning():
self.token_thread.start()
else:
self.config['SchoolMode']['UseThondarCloud'] = False
self.config['SchoolMode']['UseCodeIsLand'] = True
self.cloud_token = ''
if self.token_thread.isRunning():
self.token_thread.terminate()
self.token_thread.wait(200)
def restart_transfer(self):
reply = QtWidgets.QMessageBox.question(self, self.tr('Confirm'),
self.tr('Do you confirm to restart Transfer service?'),
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
QtWidgets.QMessageBox.Yes)
if reply != QtWidgets.QMessageBox.Yes:
return
tip = Notice(self, '<font color=SpringGreen>%s</font>' % self.tr('Restarting Transfer service, please wait...'))
t = tasks.WindowsTaskManager(self, tasks.WorkType.RestartTransfer)
t.started.connect(tip.show)
t.finished.connect(tip.close)
t.job_done.connect(lambda msg: QtWidgets.QMessageBox.information(self, self.tr('Information'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.failed.connect(lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.start()
def restart_instrument(self):
reply = QtWidgets.QMessageBox.question(self, self.tr('Confirm'),
self.tr('Do you confirm to restart instrument?'),
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
QtWidgets.QMessageBox.Yes)
if reply != QtWidgets.QMessageBox.Yes:
return
tip = Notice(self, '<font color=SpringGreen>%s</font>' % self.tr('Restarting instrument, please wait...'))
t = tasks.WindowsTaskManager(self, tasks.WorkType.RestartInstrumentQuickly)
t.instrument_ip = self.config['Instrument']['IP']
t.started.connect(tip.show)
t.started.connect(self.grpc_connection_thread.terminate)
t.started.connect(self.grpc_connection_thread.failed.emit)
t.finished.connect(tip.close)
t.finished.connect(self.grpc_connection_restart)
t.job_done.connect(lambda msg: QtWidgets.QMessageBox.information(self, self.tr('Information'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.failed.connect(lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.start()
def grpc_connection_restart(self):
self.grpc_connection_thread.connected = False
if not self.grpc_connection_thread.isRunning():
self.logger.info('Restart grpc connection')
self.grpc_connection_thread.start()
def failure_report(self):
self.logger.info('Start Export Logs app')
opened, hwnd_list = window_operation.is_window_opened_by_title('Export Logs')
if opened:
self.logger.info('Export Logs is already opened')
for h in hwnd_list:
window_operation.set_window_foreground_by_hwnd(h)
else:
self.logger.info('Export Logs is not opened')
command = os.path.join(tasks.here, 'Export Logs.exe')
if QtCore.QProcess.startDetached(f'"{command}"'):
self.logger.info(f'Launch {command}')
else:
self.logger.critical(f'Failed to launch {command}')
# win32api.ShellExecute(0, 'open', f'"{command}"', '', tasks.here, 1)
def data_manager(self):
self.logger.info('Start MSI Data Manager')
opened, hwnd_list = window_operation.is_window_opened_by_title('MSI Data Manager')
if opened:
self.logger.info('MSI Data Manager is already opened')
for h in hwnd_list:
window_operation.set_window_foreground_by_hwnd(h)
else:
self.logger.info('MSI Data Manager is not opened')
command = os.path.join(tasks.here, 'MSI Data Manager.exe')
if QtCore.QProcess.startDetached(f'"{command}"'):
self.logger.info(f'Launch {command}')
else:
self.logger.critical(f'Failed to launch {command}')
def exit2windows_password(self):
# if self.msi_device_sn:
keyword = engineering_staff.generate_keyword_by_date()
txt, ok = QtWidgets.QInputDialog.getText(self, self.tr('Password'), self.tr('Please input password'),
echo=QtWidgets.QLineEdit.Normal)
if ok:
if txt.upper() == keyword:
return True
else:
QtWidgets.QMessageBox.critical(self, self.tr('Error'), self.tr('Password is not correct!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return False
# else:
# QtWidgets.QMessageBox.critical(self, self.tr('Error'), self.tr('No Instrument SN found!'),
# QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
def exit2windows(self):
if self.exit2windows_password():
t = tasks.WindowsTaskManager(self, tasks.WorkType.ExitToWindows)
t.failed.connect(
lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.app_close.connect(self.close)
t.start()
def record_instrument_sn(self):
if self.msi_device_sn and self.msi_device_model:
recorded_sn = window_operation.get_info_from_reg(window_operation.CURRENT_USER_HKEY,
window_operation.RegPathKey, window_operation.SNKey)
recorded_model = window_operation.get_info_from_reg(window_operation.CURRENT_USER_HKEY,
window_operation.RegPathKey, window_operation.ModelKey)
if recorded_sn == self.msi_device_sn and recorded_model == self.msi_device_model:
return
else:
self.logger.info(f'Record instrument SN: {self.msi_device_sn} and Model: {self.msi_device_model} into register')
window_operation.set_string_to_reg(window_operation.CURRENT_USER_HKEY, window_operation.RegPathKey,
window_operation.SNKey, self.msi_device_sn)
window_operation.set_string_to_reg(window_operation.CURRENT_USER_HKEY, window_operation.RegPathKey,
window_operation.ModelKey, self.msi_device_model)
def msi_report(self):
self.report_starter_timer.stop() # 定时器关闭
self.logger.info('Start MSI Report')
opened, hwnd_list = window_operation.is_window_opened_by_title('MSI Reporting.exe')
if opened:
self.logger.info('MSI Report is already opened')
for h in hwnd_list:
window_operation.set_window_foreground_by_hwnd(h)
else:
self.logger.info('MSI Report is not opened')
report_exe = os.path.join(tasks.here, '../MSIReport/MSI Reporting.exe')
if not os.path.exists(report_exe):
QtWidgets.QMessageBox.critical(self, self.tr('Error'), self.tr('%s is not existed') % report_exe,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
command = f'"{report_exe}" --full' if self.full_screen_flag else f'"{report_exe}"'
if QtCore.QProcess.startDetached(command):
self.logger.info(f'Launch {report_exe}')
else:
self.logger.critical(f'Failed to launch {report_exe}')
def order_glasses(self):
self.glass_order_timer.stop()
self.logger.info('Start Order Glasses page')
opened, hwnd_list = window_operation.is_window_opened_by_title('msedge.exe')
found = False
if opened:
self.logger.info('Edge is already opened')
for h in hwnd_list:
if '盛达视光耗材' in win32gui.GetWindowText(h):
window_operation.set_window_foreground_by_hwnd(h)
found = True
if not found:
self.logger.info('Edge is not opened')
if not os.path.exists(window_operation.EDGE_EXE):
self.logger.critical(f'{window_operation.EDGE_EXE} is not existed!')
QtWidgets.QMessageBox.critical(self, self.tr('Error'),
self.tr('%s is not existed') % window_operation.EDGE_EXE,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
user_config_folder = os.path.join(USER_CONFIG_FOLDER, 'Edge')
if not os.path.exists(user_config_folder):
os.mkdir(user_config_folder)
# f'"{window_operation.EDGE_EXE}" --custom-launcher-page http://test.see-want.cn' 启动网页
# command = f'"{window_operation.EDGE_EXE}" --app={self.config["GlassesOrderAddress"]} --user-data-dir="{user_config_folder}"'
# if QtCore.QProcess.startDetached(f'{command}'):
# self.logger.info(f'Launch {command}')
# else:
# self.logger.critical(f'Failed to launch {command}')
if os.path.exists(window_operation.EDGE_EXE):
result = win32api.ShellExecute(None, 'open', window_operation.EDGE_EXE,
f'--app={self.config["GlassesOrderAddress"]} --user-data-dir="{user_config_folder}',
'', win32con.SW_SHOWMAXIMIZED)
if result > 32:
self.logger.info('Launched edge')
else:
self.logger.critical('failed to launch edge')
else:
self.logger.critical('Edge is not existed, failed to launch')
QtWidgets.QMessageBox.critical(self, self.tr('Error'), self.tr('Edge is not existed!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
# # 查找edge句柄,将其最大化显示
# opened, hwnd_list = window_operation.is_window_opened_by_title('msedge.exe')
# try_time = 0
# while not opened:
# opened, hwnd_list = window_operation.is_window_opened_by_title('msedge.exe')
# try_time += 1
# if try_time == 40:
# break
# QtCore.QThread.msleep(50) # 等50ms,Edge启动
# for h in hwnd_list:
# window_operation.set_window_foreground_by_hwnd(h, show_maximized=True)
def windows_shutdown_or_restart(self, shutdown=False, restart=False):
if shutdown:
msg = self.tr('Do you confirm to shutdown system?')
command = 'shutdown /s /t 0'
elif restart:
msg = self.tr('Do you confirm to restart system?')
command = 'shutdown /r /t 0'
else:
return
reply = QtWidgets.QMessageBox.question(self, self.tr('Confirm'), msg,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
QtWidgets.QMessageBox.No)
if reply != QtWidgets.QMessageBox.Yes:
return
os.system(command)
def add_top_toolbar(self, title, actions=None):
toolbar = ToolBar(title)
if actions:
qt.addActions(toolbar, actions)
self.addToolBar(QtCore.Qt.TopToolBarArea, toolbar)
return toolbar
def network_status(self, msg: str, connected: bool):
if connected:
pix = self.network_connected_pix
else:
pix = self.network_not_connected_pix
self.system_status_label.label_network_status.setToolTip(msg)
self.system_status_label.label_network_status.setPixmap(pix)
def add_usb_driver(self, partitions: list):
if not partitions:
return
self.logger.info(f'Inserted USB partitions: {partitions}')
no_icon = len(self.usb_driver_remove_menu.actions()) == 0
for p in partitions:
a = qt.newAction(self.usb_driver_remove_menu, self.tr('Remove %s') % p,
slot=lambda: self.usb_driver_thread.remove_driver(p))
a.setObjectName(p)
self.usb_driver_remove_menu.addAction(a)
if no_icon:
action = qt.newAction(self, self.tr('Remove USB'), icon='USB',
slot=lambda: self.usb_driver_remove_menu.exec_(QtGui.QCursor.pos()))
self.tools.addAction(action)
self.action_popup_usb_driver_remove_menu = self.tools.actions()[-1] # addAction后,对象改变,只能从tools里面拿
def remove_usb_driver(self, partitions: list):
if not partitions:
return
for p in partitions:
for action in self.usb_driver_remove_menu.actions():
if action.objectName() == p:
self.usb_driver_remove_menu.removeAction(action)
break
if len(self.usb_driver_remove_menu.actions()) == 0:
self.logger.info('All USB driver removed')
if isinstance(self.action_popup_usb_driver_remove_menu, QtWidgets.QWidgetAction):
self.tools.removeAction(self.action_popup_usb_driver_remove_menu)
self.action_popup_usb_driver_remove_menu = None
def nativeEvent(self, eventType, message):
""" Handle the Windows message """
msg = ctypes.wintypes.MSG.from_address(message.__int__())
# 过滤alt f4关闭操作
# https://learn.microsoft.com/zh-cn/windows/win32/inputdev/wm-syskeydown
if msg.message == win32con.WM_SYSKEYDOWN and msg.wParam == win32con.VK_F4:
self.forbidden_key_triggered = True
return False, 0
return super().nativeEvent(eventType, message)
# def changeEvent(self, event):
# if event.type() == QtCore.QEvent.WindowStateChange:
# if self.windowState() & QtCore.Qt.WindowMinimized:
# self.setWindowState(QtCore.Qt.WindowMaximized)
# event.ignore()
# """
# 这里插入最小化窗口的处理逻辑代码
# """
# #self.close()
# return
#
# return super(MainWindow, self).changeEvent(event)
def seal_windows(self):
if self.full_screen_flag:
# unseal
if not self.msi_device_sn:
QtWidgets.QMessageBox.critical(self, self.tr('Error'), self.tr('No Instrument SN found!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
return
key = engineering_staff.generate_keyword_by_date('', self.msi_device_sn)
txt, ok = QtWidgets.QInputDialog.getText(self, self.tr('Input password'),
self.tr('Please input unseal code!'),
echo=QtWidgets.QLineEdit.Normal)
if ok and txt.upper() == key.upper():
tip = Notice(self, '<font color=SpringGreen>%s</font>' % self.tr('Unseal windows...'))
t = tasks.WindowsTaskManager(self, tasks.WorkType.UnsealWindows)
t.started.connect(tip.show)
t.finished.connect(tip.close)
t.failed.connect(
lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.restart_windows_request.connect(lambda: self.windows_shutdown_or_restart(restart=True))
t.start()
else:
# seal
QtWidgets.QMessageBox.warning(self, self.tr('Warning'),
self.tr('This operation will seal windows, and can not get into the windows desktop again!\n'
'After windows sealed, if you want to recover the windows desktop, please contact Thondar Service!'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
key = engineering_staff.generate_keyword_by_datetime()
txt, ok = QtWidgets.QInputDialog.getText(self, self.tr('Operation confirm'),
self.tr('Please input "%s" to confirm your operation!') %
'<font color=SpringGreen>%s</font>' % key)
if ok and txt.upper() == key:
tip = Notice(self, '<font color=SpringGreen>%s</font>' % self.tr('Seal windows...'))
t = tasks.WindowsTaskManager(self, tasks.WorkType.SealWindows)
t.started.connect(tip.show)
t.finished.connect(tip.close)
t.failed.connect(
lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.restart_windows_request.connect(lambda: self.windows_shutdown_or_restart(restart=True))
t.start()
def synchronize_time(self):
reply = QtWidgets.QMessageBox.question(self, self.tr('Confirm'),
self.tr('Synchronize time between instrument and PC,'
' this operation needs to restart instrument software,'
' do you confirm to proceed?'),
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No,
QtWidgets.QMessageBox.Yes)
if reply != QtWidgets.QMessageBox.Yes:
return
tip = Notice(self, '<font color=SpringGreen>%s</font>' % self.tr('Restarting instrument, please wait...'))
t = tasks.WindowsTaskManager(self, tasks.WorkType.SynchronizeTime)
t.instrument_ip = self.config['Instrument']['IP']
t.started.connect(tip.show)
t.finished.connect(tip.close)
t.job_done.connect(lambda msg: QtWidgets.QMessageBox.information(self, self.tr('Information'), msg,
QtWidgets.QMessageBox.Ok,
QtWidgets.QMessageBox.Ok))
t.failed.connect(lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.start()
def show_instrument_information(self):
if self.msi_device_model and self.msi_device_sn:
msg = '<p>%s<font color=SpringGreen>%s</font></p>' % (self.tr('Instrument Model: '), self.msi_device_model)\
+ '<p>%s<font color=SpringGreen>%s</font></p>' % (self.tr('Instrument SN: '), self.msi_device_sn)
else:
msg = ''
msg += '<p>%s<font color=SpringGreen>%s</font></p>' % (self.tr('Instrument Software Version: '), self.device_sw_version.lstrip('V'))\
+ '<p>%s<font color=SpringGreen>%s</font></p>' % (self.tr('Instrument Algorithm Version: '), self.device_algorithm.lstrip('V'))\
+ '<p>%s<font color=SpringGreen>%s</font></p>' % (self.tr('MSI Utilities Version: '), window_operation.MSI_Utilities_Version)
QtWidgets.QMessageBox.about(self, self.tr('Instrument Information'), msg)
def import_trial_code(self):
ui = ImportTrialCodeWidget(self, self.msi_device_sn)
ui.import_trial_code.connect(self.trial_code_result)
ui.import_trial_code.connect(ui.close)
ui.show()
def trial_code_result(self, code: str):
tip = Notice(self, '<font color=SpringGreen>%s</font>' % self.tr('Import trial code, please wait...'))
t = tasks.WindowsTaskManager(self, tasks.WorkType.ImportTrialCode)
t.instrument_ip = self.config['Instrument']['IP']
t.trial_code = code
t.started.connect(tip.show)
t.started.connect(self.grpc_connection_thread.terminate)
t.started.connect(self.grpc_connection_thread.failed.emit)
t.finished.connect(tip.close)
t.finished.connect(self.grpc_connection_restart)
t.job_done.connect(lambda msg: QtWidgets.QMessageBox.information(self, self.tr('Information'), msg,
QtWidgets.QMessageBox.Ok,
QtWidgets.QMessageBox.Ok))
t.failed.connect(lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.start()
def check_update_status(self):
tip = Notice(self, '<font color=SpringGreen>%s</font>' % self.tr('Checking update information...'))
t = tasks.WindowsTaskManager(self, tasks.WorkType.CheckUpdateStatus)
t.instrument_ip = self.config['Instrument']['IP']
t.query_url_of_cloud = self.third_config['Updater']['Domain'] + '/' + self.third_config['Updater']['QueryAPI']
t.app_id = self.third_config['Updater']['AppID']
t.app_secret = self.third_config['Updater']['AppSecret']
t.updater_query_timeout = self.third_config['Updater']['QueryTimeout']
t.started.connect(tip.show)
t.finished.connect(tip.close)
t.update_software_selected.connect(self.update_software)
t.job_done.connect(lambda msg: QtWidgets.QMessageBox.information(self, self.tr('Information'), msg,
QtWidgets.QMessageBox.Ok,
QtWidgets.QMessageBox.Ok))
t.update_msg.connect(lambda msg: tip.label.setText('<font color=SpringGreen>%s</font>' % msg))
t.failed.connect(lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.start()
def update_software(self, instrument_info, utilities_info, updater_info, instrument_package_profile_id):
reply = QtWidgets.QMessageBox.question(self, self.tr('Confirm'),
self.tr('Detected new version, do you want to perform updates?'),
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No, QtWidgets.QMessageBox.Yes)
if reply != QtWidgets.QMessageBox.Yes:
return
QtWidgets.QMessageBox.information(self, self.tr('Information'),
self.tr('The updating operation needs Administrator Authorization to run, '
'so please accept the Authorization Request when it pops up.\n'
'The instrument and PC will restart when the update finished.'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
tip = Notice(self, '<font color=SpringGreen>%s</font>' % self.tr('Updating, please wait...'))
t = tasks.WindowsTaskManager(self, tasks.WorkType.UpdateSoftware)
t.updated_report_url_of_cloud = self.third_config['Updater']['Domain'] + '/' + self.third_config['Updater']['UpdatedReportAPI']
t.instrument_ip = self.config['Instrument']['IP']
t.app_id = self.third_config['Updater']['AppID']
t.app_secret = self.third_config['Updater']['AppSecret']
t.instrument_package_download_info = instrument_info
t.utilities_package_download_info = utilities_info
t.updater_package_download_info = updater_info
t.instrument_package_profile_id = instrument_package_profile_id
t.download_package_timeout = self.third_config['Updater']['DownloadTimeout']
t.started.connect(tip.show)
t.started.connect(self.grpc_connection_thread.terminate)
t.started.connect(self.grpc_connection_thread.failed.emit)
t.finished.connect(tip.close)
t.finished.connect(self.grpc_connection_restart)
t.app_close.connect(self.close)
t.job_done.connect(lambda msg: QtWidgets.QMessageBox.information(self, self.tr('Information'), msg,
QtWidgets.QMessageBox.Ok,
QtWidgets.QMessageBox.Ok))
t.update_msg.connect(lambda msg: tip.label.setText('<font color=SpringGreen>%s</font>' % msg))
t.failed.connect(lambda msg: QtWidgets.QMessageBox.critical(self, self.tr('Error'), msg,
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok))
t.start()
def change_custom_status(self):
if self.sender() == self.action_enable_tianjin_eye_hospital:
status = window_operation.get_info_from_reg(window_operation.CURRENT_USER_HKEY,
window_operation.CustomPathKey,
window_operation.TianjinEyeHospital)
current_status = 'True' if self.action_enable_tianjin_eye_hospital.isChecked() else 'False'
if status == current_status:
return
window_operation.set_string_to_reg(window_operation.CURRENT_USER_HKEY, window_operation.CustomPathKey,
window_operation.TianjinEyeHospital, current_status)
if self.action_enable_tianjin_eye_hospital.isChecked():
QtWidgets.QMessageBox.information(self, self.tr('Information'),
self.tr('Scanning code input enabled, local patients can only be queried by name'),
QtWidgets.QMessageBox.Ok, QtWidgets.QMessageBox.Ok)
elif self.sender() == self.action_tianjin_eye_hospital_confirm_automatically:
status = window_operation.get_info_from_reg(window_operation.CURRENT_USER_HKEY,
window_operation.CustomPathKey,
window_operation.TianjinEyeHospitalConfirm)
current_status = 'True' if self.action_tianjin_eye_hospital_confirm_automatically.isChecked() else 'False'
if status == current_status:
return
window_operation.set_string_to_reg(window_operation.CURRENT_USER_HKEY, window_operation.CustomPathKey,
window_operation.TianjinEyeHospitalConfirm, current_status)
def fill_patient_info(self, info: dict):
print(info)
with QtCore.QSignalBlocker(self.lineEdit_ID):
self.lineEdit_ID.setText(info.get('ID'))
with QtCore.QSignalBlocker(self.lineEdit_Last_Name):
self.lineEdit_Last_Name.setText(info.get('Name'))
self.dateEdit.setDateTime(QtCore.QDateTime.fromMSecsSinceEpoch(int(info.get('BirthDate'))))
if info.get('Gender') == 'Male':
self.radioButton_Male.setChecked(True)
elif info.get('Gender') == 'Female':
self.radioButton_Female.setChecked(True)
elif info.get('Gender') == 'Unknown':
self.radioButton_Unkown.setChecked(True)
if self.action_enable_tianjin_eye_hospital.isChecked() and \
self.action_tianjin_eye_hospital_confirm_automatically.isChecked():
self.confirm_new()
if __name__ == '__main__':
# 检查是否已经启动了
# try:
# task_channel = grpc.insecure_channel(tasks.TASKS_ADDRESS)
# grpc.channel_ready_future(task_channel).result(timeout=0.1)
# task_channel.close()
# except grpc.FutureTimeoutError:
opened, hwnd_list = window_operation.is_window_opened_by_title('MSI Study.exe')
if opened:
for h in hwnd_list:
window_operation.set_window_foreground_by_hwnd(h)
sys.exit(0)
else:
app = QtWidgets.QApplication(sys.argv)
if os.path.exists(p := os.path.join(tasks.here, 'config/style.qss')):
with open(p) as fl:
app.setStyleSheet(fl.read())
win = MainWindow(*sys.argv)
# desktop = QtWidgets.QDesktopWidget().screenGeometry()
# win.show()
# win.resize(1400, 900)
# win.move((desktop.width() - win.width()) // 2, (desktop.height() - win.height()) // 2 - 50)
# win.showMaximized()
sys.exit(app.exec_())
# pyinstaller -w --clean -i icons/rgb.ico --noconfirm --distpath E:\PackPython -D app.py --name "MSI Study" --add-data=icons;./icons --add-data=language;./language --add-data=scenes;./scenes --add-data=config;./config --add-data=res;./res --version-file study_version.txt
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。