代码拉取完成,页面将自动刷新
同步操作将从 hotootop/薅羊毛 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
import PySimpleGUI as sg
import os
import subprocess as sub
import time
from atexit import register
from threading import Thread
from db import Mongo
from base import MyTimer
from time import sleep
from automain import AutoMain
from apps import APPS
from special import specialmain
today = int(time.strftime('%Y%m%d'))
class UI:
def __init__(self):
self.sg = sg
self.sg.ChangeLookAndFeel('GreenTan') #主题颜色
self.mobiles = {}
self.mobiles_ = {}
self.apps = list(APPS.keys())
self.mongo = Mongo('FortuneCat')
self.subs = {}
self.subs_info = {}
self.timer_subs_paused = False
self.special_subs = {}
self.special_subs_info = {}
self.debug = True
self.today = self.today = int(time.strftime('%Y%m%d'))
def popup(self, text, title, color, auto_close=True):
self.sg.popup(text, title=title, text_color=color,
auto_close=auto_close, non_blocking=True,
auto_close_duration=4)
def creat_all_btn(self):
all_btn = [
self.sg.Button('更新已连接手机', key='_mobiles_btn_'),
self.sg.Button('全部启动', key='_all_start_btn_'),
self.sg.Button('全部停止', key='_all_stop_btn_', disabled=True),
self.sg.Text(' || 添加脚本', font=('微软雅黑', 12)),
self.sg.Input(key='_code_path_'),
self.sg.FileBrowse('查找'),
self.sg.OK('确定'),
self.sg.Text(' || 单刷手机任务', font=('微软雅黑', 12)),
self.sg.Combo(list(self.mobiles.keys()), default_value='dd43c08f',
size=(8, 1), key='_mobile_combo_'),
self.sg.Combo(self.apps, default_value='微信', key='_apps_combo_',
size=(8, 1), tooltip='请按enter键'),
self.sg.Checkbox('签到', default=True, key='_signin_'),
self.sg.Checkbox('周期', default=True, key='_period_'),
self.sg.Checkbox('内容', default=True, key='_content_'),
self.sg.Button('启动', key='_special_start_btn_', disabled=True),
self.sg.Button('停止', key='_special_stop_btn_', disabled=True),
self.sg.Text(' '),
self.sg.Button('调试', key='_debug_', font=('微软雅黑', 12))
]
return all_btn
def get_mobiles(self):
res = os.popen('adb devices -l').readlines()
for i in res[1:-1]:
j = i.split(' ')
device = j[0]
for q in j:
if 'model' in q:
self.mobiles[q[6:]] = device
self.mobiles_ = {k: v for v, k in self.mobiles.items()}
self.mongo.replace_data('mobile', {'base':self.mobiles}, {})
print(self.mobiles)
def _update_data(self):
mobile_all_cash = {}
mobile_get_cash = {}
all_list_data = {}
all_log_info = {}
for device in self.mobiles.values():
cash = self.mongo.find_data_list('cash', {'date': today, 'device':device})
state = self.mongo.find_data_list('work_state', {'date': today, 'device':device})
worked = self.mongo.find_data_list('worked_time', {'date': today, 'device':device})
today_cash = {}
work_state = {}
worked_time = {}
if cash and cash.get('content'):
today_cash = cash.get('content')
if state and state.get('content'):
work_state = state.get('content')
if worked and worked.get('content'):
worked_time = worked.get('content')
all_cash = sum([i.get('all_cash',0) for i in today_cash.values()])
get_cash = sum([i.get('get_cash',0) for i in today_cash.values()])
mobile_all_cash.update({device: str(round(all_cash, 1))})
mobile_get_cash.update({device: str(round(get_cash, 1))})
list_data = []
for name in today_cash.keys():
all_cash = today_cash.get(name,{}).get('all_cash', 0)
now_cash = today_cash.get(name, {}).get('now_cash', 0)
get_cash = today_cash.get(name, {}).get('get_cash', 0)
work_state_ = work_state.get(name, '')
how_long = round(worked_time.get(name, 0),1)
list_data.append([name, all_cash, now_cash, get_cash,
work_state_, how_long])
all_list_data.update({device: list_data})
log_path = f'./netcash{device}Logs/{today}_netcash{device}.txt'
if os.path.exists(log_path):
with open(log_path, 'r') as f:
result = f.read()
else:
result = '没有找到日志文件'
all_log_info.update({device: result})
return mobile_all_cash, mobile_get_cash, all_list_data, all_log_info
def thread_update_func(self, window):
v = self._update_data()
window.write_event_value('_update_', v)
def _get_subs_info(self):
for device in self.subs.keys():
if sub := self.subs.get(device):
if sub.poll() == 0:
out, err = sub.communicate()
elif sub.poll() == 1:
out, err = sub.communicate()
return device, out
print(f'{device}程序异常结束')
def thread_sub_func(self, window):
v = self._get_subs_info()
window.write_event_value('_sub_', v)
def start(self, device, all=False):
if self.debug:
Thread(target=AutoMain(device).start,daemon=True).start()
else:
if not self.subs.get(device):
# 给cmd传参数,用空格,不能用逗号
cmd = r'cd E:\python\fortunecat_3 && python automain.py {}'.format(device)
sub_ = sub.Popen(cmd,
stdin=sub.PIPE,
stdout=sub.PIPE,
stderr=sub.STDOUT,
text=True,
shell=True,
)
self.subs.update({device: sub_})
#
if not all:
self.popup(f'{self.mobiles_.get(device)}启动', '正常', 'green')
def all_start(self):
start_ = []
started_ = []
for device in self.mobiles.values():
if device in self.subs:
started_.append(self.mobiles_.get(device))
else:
self.start(device, True)
start_.append(self.mobiles_.get(device))
if started_:
self.popup(f'{started_} 正在运行,无法启动', '错误', 'red', False)
if start_:
self.popup(f"{start_} 启动", '正常', 'green')
def stop(self, device):
if device in self.subs:
sub_ = self.subs.get(device)
print('杀死进程{sub_.pid}')
os.system(f"taskkill /t /f /pid {sub_.pid}")
# self.mongo.update_data('sys_control', {'date':self.today}, {device:'stop'})
if device in self.subs:
del self.subs[device]
self.popup(f'{self.mobiles_.get(device)}停止', '正常', 'green')
# else:
if name := self.mobiles_.get(device):
self.popup(f'{name}未启动无法停止', '异常', 'red')
def all_stop(self,):
print(self.subs)
stop_ = []
stoped_ = []
for device in self.mobiles:
if device in self.subs.values():
print(device)
stop_.append(self.mobiles_.get(device))
os.system(f"taskkill /t /f /pid {self.subs[device]}.pid")
# mongo.update_data('sys_control',{}, {device: 'stop'})
else:
if name := self.mobiles_.get(device):
stoped_.append(name)
if stoped_:
self.popup(f"{stoped_} 未启动无法停止", '错误', 'red', False)
if stop_:
self.popup(f'{stop_}停止', '正常', 'green')
def special_start(self, device, app_name, tasks):
cmd = r'cd E:\python\fortunecat_1 && python automain.py {} {} {}'.format(device, app_name, tasks)
sub_ = sub.Popen(cmd,
stdin=sub.PIPE,
stdout=sub.PIPE,
stderr=sub.STDOUT,
text=True,
shell=True,
)
self.special_subs.update({device: sub_})
# Thread(target=specialmain, args=(device, app_name, tasks), daemon=True).start()
# specialmain(device, app_name)
self.popup('停止', '正常', 'green')
self.popup(f'{self.mobiles_.get(device)} {app_name}启动', '正常', 'green')
def special_stop(self, device):
pass
# if sub_ := self.special_subs.get(device):
# os.system(f"taskkill /t /f /pid {sub_.pid}")
# del self.special_subs[device]
# self.popup(f'{self.mobiles_.get(device)} 停止', '正常', 'green')
# else:
# self.popup('系统设计出错', '错误', 'red', False)
def creat_mobile_box(self, name, device):
mobile_btn = [self.sg.Text('累计赚XX元,昨日赚XX元', key=f'_{device}_info_', size=(40, 1)),
self.sg.Button('启动', key=f'_{device}_start_btn_'),
self.sg.Button('停止', key=f'_{device}_stop_btn_', disabled=True),
]
mobile_tb = self.sg.Table(values=[[str(i) for i in range(6)]],
headings=['软件', '累计收益', '当前现金', '昨日收益',
'工作状态', '工作时长'],
def_col_width= 7,
max_col_width=10,
#自动调整列宽(根据上面第一次的values默认值为准,update时不会调整)
auto_size_columns=False,
# display_row_numbers=True,#序号
justification='center',#字符排列 left right center
# num_rows=10,#行数
row_height=30,#行高
key=f'_{device}_tb_',
font=('微软雅黑', 12),
text_color='black',
# background_color='white',
enable_events=True,
bind_return_key=True,
tooltip='This is a table')
log_info = self.sg.Frame('运行日志',[[
self.sg.Multiline('运行日志', size=(60, 8), autoscroll=True, key=f'_{device}_log_')]])
tb_frame = self.sg.Frame('工作状态', [[mobile_tb], [log_info]])
return self.sg.Frame(f'手机{name}', [ mobile_btn, [self.sg.Column([[tb_frame]])]])
def creat_layout(self, mobiles, cols=3):
layout_cols = []
for i in range(0, len(mobiles), cols):
layout_col = [self.creat_mobile_box(name, mobiles[name])
for name in list(mobiles.keys())[i: i+cols]]
layout_cols.append(layout_col)
return self.sg.Column(layout_cols, vertical_scroll_only=False,
scrollable=True, key='_column_', size=(1920, 1080))
def main(self):
self.get_mobiles()
# special_out = self.sg.Output(size=(150, 45))
special_out= None
special_out_active = False
layout = [self.creat_all_btn(), [self.creat_layout(self.mobiles)]]
# 窗口设置
window = self.sg.Window('招财猫', resizable=True, return_keyboard_events=True).Layout(layout)
timer_update = MyTimer(1, mytarget=self.thread_update_func, myargs=(window,), daemon=True)
timer_subs = MyTimer(2, mytarget=self.thread_sub_func, myargs=(window,), daemon=True)
def btn_start(device):
window[f'_{device}_start_btn_'].update(disabled=True)
window[f'_{device}_stop_btn_'].update(disabled=False)
def btn_stop(device):
window[f'_{device}_start_btn_'].update(disabled=False)
window[f'_{device}_stop_btn_'].update(disabled=True)
while True: # 创建一个事件循环,否则窗口运行一次就会被关闭
event, values = window.Read() # event, 以value的值判断窗口事件
special_device = window['_mobile_combo_'].get()
special_apps_name = window['_apps_combo_'].get()
# 如果事件的值为 None,表示点击了右上角的关闭按钮
if event == self.sg.WIN_CLOSED or event == 'Exit' or event is None:
break
if special_device != '选择手机' and special_apps_name != '选择程序':
window['_special_start_btn_'].update(disabled=False)
if event == '_special_start_btn_' and not special_out_active:
special_out_active = True
# win_special = self.sg.Window('单刷输出内容', [[special_out]])
tasks = ['签到', '周期', '内容']
if not window['_signin_'].get():
tasks.remove('签到')
if not window['_period_'].get():
tasks.remove('周期')
if not window['_content_'].get():
tasks.remove('内容')
self.special_start(special_device, special_apps_name, tasks)
if special_out_active:
pass
# event_special, values_special = win_special.Read()
# if event_special is None or event_special == 'Exit':
# win_special.close()
if event == '_mobiles_btn_': # 当获取反馈提交按钮,处理逻辑
self.get_mobiles()
timer_update.start()
timer_subs.start()
# 程序退出时执行关闭线程
register(timer_subs.stop)
register(timer_update.stop)
window['_mobiles_btn_'].update(disabled=True)
elif event == '_all_start_btn_':
window['_all_start_btn_'].update(disabled=True)
window['_all_stop_btn_'].update(disabled=False)
for device in self.mobiles_:
btn_start(device)
self.all_start()
if self.timer_subs_paused:
self.subs_info.update({device:None})
timer_subs.resume()
self.timer_subs_paused = False
elif event == '_all_stop_btn_':
window['_all_start_btn_'].update(disabled=False)
window['_all_stop_btn_'].update(disabled=True)
for device in self.mobiles_:
btn_stop(device)
self.all_stop()
for device in self.mobiles.values():
if event == f'_{device}_start_btn_':
btn_start(device)
self.start(device)
if self.timer_subs_paused:
timer_subs.resume()
self.timer_subs_paused = False
elif event == f'_{device}_stop_btn_':
btn_stop(device)
self.stop(device)
if event == '_update_':
mobile_all_cash, mobile_get_cash, all_list_data, all_log_info = values['_update_']
for device in self.mobiles.values():
window[f'_{device}_info_'].update(
value=f"""共{len(all_list_data.get(device, []))}个软件,累计赚{mobile_all_cash.get(device,'')}元, 昨日赚{mobile_get_cash.get(device, '')}元""")
window[f'_{device}_tb_'].update(values=all_list_data.get(device, []))
window[f'_{device}_log_'].update(value=all_log_info.get(device, ''))
elif event == '_sub_':
if values['_sub_']:
device, out = values['_sub_']
if out == self.subs_info.get(device):
pass
else:
self.subs_info.update({device: out})
btn_stop(device)
self.sg.popup_scrolled(out, title=f'{self.mobiles_.get(device)}异常停止', text_color='red')
timer_subs.pause()
self.timer_subs_paused = True
if event == '_debug_':
self.debug = True
self.all_stop()
for device in self.mobiles_:
btn_stop(device)
timer_subs.stop()
timer_update.stop()
window.Close()
if __name__ == '__main__':
gui = UI()
gui.main()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。