加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
trex.py 8.84 KB
一键复制 编辑 原始数据 按行查看 历史
SUPER_紫电 提交于 2024-04-15 05:45 . 主目录文件
#-------------------------------------------------------------------------------
# Name: TREX
# Purpose: TaskFile Register ExecuteTool
#
# Author: SUPER_紫电 QQ,911344755
#
# Created: 15/09/2022
# Copyright: (c) SUPER_紫电 2022 All rights reserved.
# Licence: <Modified BSD License>
#-------------------------------------------------------------------------------
import os, sys, threading
try:
sys.path.append(os.path.dirname(__file__) + '\\uses')
except:
sys.path.append(os.getcwd() + '\\uses')
import common, mutex, taskfile
from colorconsole import *
from actions import *
winterm = WinTerm()
winterm.set_title('Task File Register Execute Tool')
win_size = get_console_size()
def cls(): os.system('cls')
def list_ata_port(ata_controller=[]):
port_list = {}
print('\n[ATA Controller]')
for i in range(len(ata_controller)):
ident = ''
ata_port = ata_controller[i]
base, alt, dev = ata_port['base'], ata_port['alt'], ata_port['dev']
mutex_name = 'trex_mutex_0x%04x_0x%02x' % (base, dev)
if mutex.exist_mutex(mutex_name) or not mutex.create_mutex(mutex_name):
ident = 'This port is in use.'
else:
taskfile.set_reg_addr(base, alt, dev)
buffer = bytearray(512)
if taskfile.get_identify_data(buffer):
model = taskfile.get_char(buffer, 54, 40).replace('\x00', '')
fw = taskfile.get_char(buffer, 46, 8 ).replace('\x00', '')
serial = taskfile.get_char(buffer, 20, 20).replace('\x00', '')
ident = model + '_' + fw + '_' + serial
port = ' '.join(['0x%04X' % x for x in ata_port.values()])
port_list[str(i)] = port
print('%d - %s - %s' % (i, port, ident))
mutex.release_mutex()
return port_list
def select_ata_port(ata_controller, selection):
ata_port = ata_controller[int(selection)]
base, alt, dev = ata_port['base'], ata_port['alt'], ata_port['dev']
taskfile.set_reg_addr(base, alt, dev)
print('\n[ATA Port]\n'+' '.join(['0x%04X' % x for x in ata_port.values()]))
def set_ata_port():
taskfile.set_ata_controller()
ata_controller = taskfile.ata_controller
winterm.set_title('Task File Register Execute Tool')
while True:
port_list = list_ata_port(ata_controller)
selection = input('Selection: ')
if selection in port_list:
ata_port = ata_controller[int(selection)]
base, alt, dev = ata_port['base'], ata_port['alt'], ata_port['dev']
mutex_name = 'trex_mutex_0x%04x_0x%02x' % (base, dev)
if not mutex.create_mutex(mutex_name):
os.system('cls')
continue
select_ata_port(ata_controller, selection)
get_ident()
updata_ata_status()
break
elif selection.lower() == 'x':
os._exit(0)
else:
os.system('cls')
continue
def set_workdir():
if common.workdir != '':
print('\nCur Work Dir - ' + common.workdir)
path = input('Input Path: ')
if os.path.exists(path):
common.workdir = path.strip()
print('\n[Work Dir]\n' + common.workdir)
else:
try:
os.makedirs(path)
except:
return
common.workdir = path.strip()
print('\n[Work Dir]\n' + common.workdir)
def poll_ata_status():
reg_stat = ['ERR' , 'INDX', 'CORR', 'DRQ', 'DSC', 'DF', 'DRDY', 'BSY']
reg_erro = ['AMNF' , 'TONF', 'ABRT', 'MCR', 'IDNF', 'MC', 'UNC', 'BBK']
while True:
taskfile.get_reg_value()
ata_port = taskfile.ata_port
base, alt, dev = ata_port['base'], ata_port['alt'], ata_port['dev']
dev = 'Master' if dev == 0x40 else 'Slave'
port = '0x%04X|0x%04X|%s' % (base, alt, dev)
rstat = taskfile.cur_status
rerro = taskfile.cur_error
status = ''
for i in range(7,-1,-1):
if taskfile.bit(i, rstat):
status += '|%s'%reg_stat[i]
errors = ''
for i in range(7,-1,-1):
if taskfile.bit(i, rerro):
errors += '|%s'%reg_erro[i]
win_title = 'TREX [%s] [0x%02X%s] [0x%02X%s]' % (port,rstat,status,rerro,errors)
winterm.set_title(win_title)
sleep(1)
def updata_ata_status():
global background_thread
background_thread = threading.Thread(target=poll_ata_status)
background_thread.start()
def thread_run(fun, param=()):
global action_thread
action_thread = threading.Thread(target=fun, args=param)
action_thread.start()
action_thread.join()
def actions_pool(title='', actions={}):
menu = ''
for cmd in actions:
menu += ' %s - %s\n' % (cmd, actions[cmd]['menu'])
menu += ' x - Exit'.ljust(len(actions[cmd]['menu'])+5)
while True:
cprint(title, fore='light_while', back='cyan')
cprint(menu, fore='light_while', back='yellow')
selection = input('Selection: ')
if selection.lower() in actions:
item = actions[selection.lower()]
thread_run(fun=item['fun'], param=item['param'])
continue
if selection.lower() == 'x':
os._exit(0)
#break
else:
os.system('cls')
def start():
def flash():
actions = {
'0': {'fun': flash_read, 'param': (common.workdir,), 'menu': 'Read Flash '},
'1': {'fun': flash_file_check, 'param': (), 'menu': 'Check Flash File'},
'2': {'fun': flash_file_read, 'param': (common.workdir,), 'menu': 'Read Flash File '},
'w': {'fun': flash_write, 'param': (common.workdir,), 'menu': 'Write Flash '},
'f': {'fun': flash_file_write, 'param': (common.workdir,), 'menu': 'Write Flash File'},
}
title = '\n' + (win_size.X//2) * '='
title+= '\n' + 'Flash Actions'.center(win_size.X//2,' ')
title+= '\n' + (win_size.X//2) * '='
actions_pool(title, actions)
def modules():
actions = {
'0': {'fun': res_read, 'param': (common.workdir,), 'menu': 'Read Reserved File '},
'1': {'fun': res_check, 'param': (), 'menu': 'Check Reserved File'},
'2': {'fun': res_read35, 'param': (common.workdir,), 'menu': 'Read File 0x35 '},
'w': {'fun': res_write, 'param': (common.workdir,), 'menu': 'Write Reserved File'},
'o': {'fun': load_ovl_wrfl, 'param': (common.workdir,), 'menu': 'Load Ovl Write File'},
}
title = '\n' + (win_size.X//2) * '='
title+= '\n' + 'Reserved File'.center(win_size.X//2,' ')
title+= '\n' + (win_size.X//2) * '='
actions_pool(title, actions)
def start():
actions = {
'0': {'fun': backup_firmware, 'param': (), 'menu': 'Backup Firmware '},
'1': {'fun': get_ident, 'param': (), 'menu': 'Drive Identify Data'},
'2': {'fun': vscid, 'param': (), 'menu': 'VSCID And Work Dir '},
'3': {'fun': flash, 'param': (), 'menu': 'Flash Actions '},
'4': {'fun': modules, 'param': (), 'menu': 'Reserved File '},
'r': {'fun': spin_down_reset, 'param': (), 'menu': 'Spin Down Reset '},
'd': {'fun': servo_spin_down, 'param': (), 'menu': 'Servo Spin Down '},
'u': {'fun': servo_spin_up, 'param': (), 'menu': 'Servo Spin Up '},
's': {'fun': soft_reset, 'param': (), 'menu': 'Soft Reset '},
'a': {'fun': set_ata_port, 'param': (), 'menu': 'Set ATA Port '},
'w': {'fun': set_workdir, 'param': (), 'menu': 'Set Work Dir '},
'v': {'fun': get_vscstat, 'param': (), 'menu': 'Get VSC Status '},
'm': {'fun': poll_vscstat, 'param': (), 'menu': 'VSC Status Monitor '},
}
title = '\n' + (win_size.X-1) * '='
title+= '\n' + 'Task File Register Execute Tool'.center(win_size.X-1,' ')
title+= '\n' + (win_size.X-1) * '='
actions_pool(title, actions)
start()
#-------------------------------------------------------------------------------
def main():
if not taskfile.init_drv():
print('Driver initialize %s!' % str(bool(taskfile.drv_initialized)).lower())
return
print('Driver initialize %s.' % str(bool(taskfile.drv_initialized)).lower())
path = common.get_startup_dir()
common.tempdir = path + common.tempdir
common.workdir = common.tempdir
if not os.path.exists(common.tempdir):
os.makedirs(common.tempdir)
set_ata_port()
start()
taskfile.deinit_drv()
mutex.release_mutex()
if __name__ == '__main__':
main()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化