加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
ui.py 16.85 KB
一键复制 编辑 原始数据 按行查看 历史
hotootop 提交于 2021-02-02 11:09 . 主要代码
import PySimpleGUI as sg
import os
import subprocess as sub
import time
from atexit import register
from threading import Thread
from db import Mongo
from base import MyTimer
from time import sleep
from automain import AutoMain
from apps import APPS
from special import specialmain
today = int(time.strftime('%Y%m%d'))
class UI:
def __init__(self):
self.sg = sg
self.sg.ChangeLookAndFeel('GreenTan') #主题颜色
self.mobiles = {}
self.mobiles_ = {}
self.apps = list(APPS.keys())
self.mongo = Mongo('FortuneCat')
self.subs = {}
self.subs_info = {}
self.timer_subs_paused = False
self.special_subs = {}
self.special_subs_info = {}
self.debug = True
self.today = self.today = int(time.strftime('%Y%m%d'))
def popup(self, text, title, color, auto_close=True):
self.sg.popup(text, title=title, text_color=color,
auto_close=auto_close, non_blocking=True,
auto_close_duration=4)
def creat_all_btn(self):
all_btn = [
self.sg.Button('更新已连接手机', key='_mobiles_btn_'),
self.sg.Button('全部启动', key='_all_start_btn_'),
self.sg.Button('全部停止', key='_all_stop_btn_', disabled=True),
self.sg.Text(' || 添加脚本', font=('微软雅黑', 12)),
self.sg.Input(key='_code_path_'),
self.sg.FileBrowse('查找'),
self.sg.OK('确定'),
self.sg.Text(' || 单刷手机任务', font=('微软雅黑', 12)),
self.sg.Combo(list(self.mobiles.keys()), default_value='dd43c08f',
size=(8, 1), key='_mobile_combo_'),
self.sg.Combo(self.apps, default_value='微信', key='_apps_combo_',
size=(8, 1), tooltip='请按enter键'),
self.sg.Checkbox('签到', default=True, key='_signin_'),
self.sg.Checkbox('周期', default=True, key='_period_'),
self.sg.Checkbox('内容', default=True, key='_content_'),
self.sg.Button('启动', key='_special_start_btn_', disabled=True),
self.sg.Button('停止', key='_special_stop_btn_', disabled=True),
self.sg.Text(' '),
self.sg.Button('调试', key='_debug_', font=('微软雅黑', 12))
]
return all_btn
def get_mobiles(self):
res = os.popen('adb devices -l').readlines()
for i in res[1:-1]:
j = i.split(' ')
device = j[0]
for q in j:
if 'model' in q:
self.mobiles[q[6:]] = device
self.mobiles_ = {k: v for v, k in self.mobiles.items()}
self.mongo.replace_data('mobile', {'base':self.mobiles}, {})
print(self.mobiles)
def _update_data(self):
mobile_all_cash = {}
mobile_get_cash = {}
all_list_data = {}
all_log_info = {}
for device in self.mobiles.values():
cash = self.mongo.find_data_list('cash', {'date': today, 'device':device})
state = self.mongo.find_data_list('work_state', {'date': today, 'device':device})
worked = self.mongo.find_data_list('worked_time', {'date': today, 'device':device})
today_cash = {}
work_state = {}
worked_time = {}
if cash and cash.get('content'):
today_cash = cash.get('content')
if state and state.get('content'):
work_state = state.get('content')
if worked and worked.get('content'):
worked_time = worked.get('content')
all_cash = sum([i.get('all_cash',0) for i in today_cash.values()])
get_cash = sum([i.get('get_cash',0) for i in today_cash.values()])
mobile_all_cash.update({device: str(round(all_cash, 1))})
mobile_get_cash.update({device: str(round(get_cash, 1))})
list_data = []
for name in today_cash.keys():
all_cash = today_cash.get(name,{}).get('all_cash', 0)
now_cash = today_cash.get(name, {}).get('now_cash', 0)
get_cash = today_cash.get(name, {}).get('get_cash', 0)
work_state_ = work_state.get(name, '')
how_long = round(worked_time.get(name, 0),1)
list_data.append([name, all_cash, now_cash, get_cash,
work_state_, how_long])
all_list_data.update({device: list_data})
log_path = f'./netcash{device}Logs/{today}_netcash{device}.txt'
if os.path.exists(log_path):
with open(log_path, 'r') as f:
result = f.read()
else:
result = '没有找到日志文件'
all_log_info.update({device: result})
return mobile_all_cash, mobile_get_cash, all_list_data, all_log_info
def thread_update_func(self, window):
v = self._update_data()
window.write_event_value('_update_', v)
def _get_subs_info(self):
for device in self.subs.keys():
if sub := self.subs.get(device):
if sub.poll() == 0:
out, err = sub.communicate()
elif sub.poll() == 1:
out, err = sub.communicate()
return device, out
print(f'{device}程序异常结束')
def thread_sub_func(self, window):
v = self._get_subs_info()
window.write_event_value('_sub_', v)
def start(self, device, all=False):
if self.debug:
Thread(target=AutoMain(device).start,daemon=True).start()
else:
if not self.subs.get(device):
# 给cmd传参数,用空格,不能用逗号
cmd = r'cd E:\python\fortunecat_3 && python automain.py {}'.format(device)
sub_ = sub.Popen(cmd,
stdin=sub.PIPE,
stdout=sub.PIPE,
stderr=sub.STDOUT,
text=True,
shell=True,
)
self.subs.update({device: sub_})
#
if not all:
self.popup(f'{self.mobiles_.get(device)}启动', '正常', 'green')
def all_start(self):
start_ = []
started_ = []
for device in self.mobiles.values():
if device in self.subs:
started_.append(self.mobiles_.get(device))
else:
self.start(device, True)
start_.append(self.mobiles_.get(device))
if started_:
self.popup(f'{started_} 正在运行,无法启动', '错误', 'red', False)
if start_:
self.popup(f"{start_} 启动", '正常', 'green')
def stop(self, device):
if device in self.subs:
sub_ = self.subs.get(device)
print('杀死进程{sub_.pid}')
os.system(f"taskkill /t /f /pid {sub_.pid}")
# self.mongo.update_data('sys_control', {'date':self.today}, {device:'stop'})
if device in self.subs:
del self.subs[device]
self.popup(f'{self.mobiles_.get(device)}停止', '正常', 'green')
# else:
if name := self.mobiles_.get(device):
self.popup(f'{name}未启动无法停止', '异常', 'red')
def all_stop(self,):
print(self.subs)
stop_ = []
stoped_ = []
for device in self.mobiles:
if device in self.subs.values():
print(device)
stop_.append(self.mobiles_.get(device))
os.system(f"taskkill /t /f /pid {self.subs[device]}.pid")
# mongo.update_data('sys_control',{}, {device: 'stop'})
else:
if name := self.mobiles_.get(device):
stoped_.append(name)
if stoped_:
self.popup(f"{stoped_} 未启动无法停止", '错误', 'red', False)
if stop_:
self.popup(f'{stop_}停止', '正常', 'green')
def special_start(self, device, app_name, tasks):
cmd = r'cd E:\python\fortunecat_1 && python automain.py {} {} {}'.format(device, app_name, tasks)
sub_ = sub.Popen(cmd,
stdin=sub.PIPE,
stdout=sub.PIPE,
stderr=sub.STDOUT,
text=True,
shell=True,
)
self.special_subs.update({device: sub_})
# Thread(target=specialmain, args=(device, app_name, tasks), daemon=True).start()
# specialmain(device, app_name)
self.popup('停止', '正常', 'green')
self.popup(f'{self.mobiles_.get(device)} {app_name}启动', '正常', 'green')
def special_stop(self, device):
pass
# if sub_ := self.special_subs.get(device):
# os.system(f"taskkill /t /f /pid {sub_.pid}")
# del self.special_subs[device]
# self.popup(f'{self.mobiles_.get(device)} 停止', '正常', 'green')
# else:
# self.popup('系统设计出错', '错误', 'red', False)
def creat_mobile_box(self, name, device):
mobile_btn = [self.sg.Text('累计赚XX元,昨日赚XX元', key=f'_{device}_info_', size=(40, 1)),
self.sg.Button('启动', key=f'_{device}_start_btn_'),
self.sg.Button('停止', key=f'_{device}_stop_btn_', disabled=True),
]
mobile_tb = self.sg.Table(values=[[str(i) for i in range(6)]],
headings=['软件', '累计收益', '当前现金', '昨日收益',
'工作状态', '工作时长'],
def_col_width= 7,
max_col_width=10,
#自动调整列宽(根据上面第一次的values默认值为准,update时不会调整)
auto_size_columns=False,
# display_row_numbers=True,#序号
justification='center',#字符排列 left right center
# num_rows=10,#行数
row_height=30,#行高
key=f'_{device}_tb_',
font=('微软雅黑', 12),
text_color='black',
# background_color='white',
enable_events=True,
bind_return_key=True,
tooltip='This is a table')
log_info = self.sg.Frame('运行日志',[[
self.sg.Multiline('运行日志', size=(60, 8), autoscroll=True, key=f'_{device}_log_')]])
tb_frame = self.sg.Frame('工作状态', [[mobile_tb], [log_info]])
return self.sg.Frame(f'手机{name}', [ mobile_btn, [self.sg.Column([[tb_frame]])]])
def creat_layout(self, mobiles, cols=3):
layout_cols = []
for i in range(0, len(mobiles), cols):
layout_col = [self.creat_mobile_box(name, mobiles[name])
for name in list(mobiles.keys())[i: i+cols]]
layout_cols.append(layout_col)
return self.sg.Column(layout_cols, vertical_scroll_only=False,
scrollable=True, key='_column_', size=(1920, 1080))
def main(self):
self.get_mobiles()
# special_out = self.sg.Output(size=(150, 45))
special_out= None
special_out_active = False
layout = [self.creat_all_btn(), [self.creat_layout(self.mobiles)]]
# 窗口设置
window = self.sg.Window('招财猫', resizable=True, return_keyboard_events=True).Layout(layout)
timer_update = MyTimer(1, mytarget=self.thread_update_func, myargs=(window,), daemon=True)
timer_subs = MyTimer(2, mytarget=self.thread_sub_func, myargs=(window,), daemon=True)
def btn_start(device):
window[f'_{device}_start_btn_'].update(disabled=True)
window[f'_{device}_stop_btn_'].update(disabled=False)
def btn_stop(device):
window[f'_{device}_start_btn_'].update(disabled=False)
window[f'_{device}_stop_btn_'].update(disabled=True)
while True: # 创建一个事件循环,否则窗口运行一次就会被关闭
event, values = window.Read() # event, 以value的值判断窗口事件
special_device = window['_mobile_combo_'].get()
special_apps_name = window['_apps_combo_'].get()
# 如果事件的值为 None,表示点击了右上角的关闭按钮
if event == self.sg.WIN_CLOSED or event == 'Exit' or event is None:
break
if special_device != '选择手机' and special_apps_name != '选择程序':
window['_special_start_btn_'].update(disabled=False)
if event == '_special_start_btn_' and not special_out_active:
special_out_active = True
# win_special = self.sg.Window('单刷输出内容', [[special_out]])
tasks = ['签到', '周期', '内容']
if not window['_signin_'].get():
tasks.remove('签到')
if not window['_period_'].get():
tasks.remove('周期')
if not window['_content_'].get():
tasks.remove('内容')
self.special_start(special_device, special_apps_name, tasks)
if special_out_active:
pass
# event_special, values_special = win_special.Read()
# if event_special is None or event_special == 'Exit':
# win_special.close()
if event == '_mobiles_btn_': # 当获取反馈提交按钮,处理逻辑
self.get_mobiles()
timer_update.start()
timer_subs.start()
# 程序退出时执行关闭线程
register(timer_subs.stop)
register(timer_update.stop)
window['_mobiles_btn_'].update(disabled=True)
elif event == '_all_start_btn_':
window['_all_start_btn_'].update(disabled=True)
window['_all_stop_btn_'].update(disabled=False)
for device in self.mobiles_:
btn_start(device)
self.all_start()
if self.timer_subs_paused:
self.subs_info.update({device:None})
timer_subs.resume()
self.timer_subs_paused = False
elif event == '_all_stop_btn_':
window['_all_start_btn_'].update(disabled=False)
window['_all_stop_btn_'].update(disabled=True)
for device in self.mobiles_:
btn_stop(device)
self.all_stop()
for device in self.mobiles.values():
if event == f'_{device}_start_btn_':
btn_start(device)
self.start(device)
if self.timer_subs_paused:
timer_subs.resume()
self.timer_subs_paused = False
elif event == f'_{device}_stop_btn_':
btn_stop(device)
self.stop(device)
if event == '_update_':
mobile_all_cash, mobile_get_cash, all_list_data, all_log_info = values['_update_']
for device in self.mobiles.values():
window[f'_{device}_info_'].update(
value=f"""共{len(all_list_data.get(device, []))}个软件,累计赚{mobile_all_cash.get(device,'')}元, 昨日赚{mobile_get_cash.get(device, '')}元""")
window[f'_{device}_tb_'].update(values=all_list_data.get(device, []))
window[f'_{device}_log_'].update(value=all_log_info.get(device, ''))
elif event == '_sub_':
if values['_sub_']:
device, out = values['_sub_']
if out == self.subs_info.get(device):
pass
else:
self.subs_info.update({device: out})
btn_stop(device)
self.sg.popup_scrolled(out, title=f'{self.mobiles_.get(device)}异常停止', text_color='red')
timer_subs.pause()
self.timer_subs_paused = True
if event == '_debug_':
self.debug = True
self.all_stop()
for device in self.mobiles_:
btn_stop(device)
timer_subs.stop()
timer_update.stop()
window.Close()
if __name__ == '__main__':
gui = UI()
gui.main()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化