加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
automain.py 16.71 KB
一键复制 编辑 原始数据 按行查看 历史
hotootop 提交于 2021-02-02 11:09 . 主要代码
# -*- coding: utf-8 -*-
import time
from time import sleep
import datetime
import sys
from threading import Thread
from atexit import register
from random import shuffle, randint
from copy import deepcopy
from apps import (APPS, APPS_CONTENT, APPS_SIGN, APPS_TIMED, APPS_PEROID,
APPS_NIGHT, APPS_HOUR, APPS_TIME)
from base import ( TaskException, TimedException, SwitchException, DoneException,
PeriodException, DayException, StopException,
NightException, HourException, MyTimer)
from uiautomator2.exceptions import BaseError
from myauto import wraper, MyAuto
from db import Mongo
from mylog import Log
log = Log(stream=True)
class MySignal:
def __init__(self):
self.call_back = None
def emit(self, *args, **kargs):
self.call_back(*args, **kargs)
def connect(self, call_back):
self.call_back = call_back
class TaskThread(Thread):
task_signal = MySignal()
def __init__(self, log, task_control,
today_cash, timed_task, mobile_device, the_working_info,
work_state):
super().__init__()
self.u = MyAuto(mobile_device)
# 给watcher装饰器传入u
wraper.u = self.u
self.log = log
self.apps = APPS
self.apps_sign = APPS_SIGN
self.apps_content = APPS_CONTENT
self.apps_period = APPS_PEROID
self.apps_timed = APPS_TIMED
self.apps_night = APPS_NIGHT
self.apps_hour = APPS_HOUR
self.apps_time = APPS_TIME
self.task_control = task_control
self.today_cash = today_cash
self.timed_task = timed_task
self.mobile_device = mobile_device
self.the_working_info = the_working_info
self.work_state = work_state
self.which_task = ''
self.the_working_app = ''
self.u.creat_watcher('child', '我知道了')
self.u.creat_watcher('no_1', '不允许')
self.u.d.watcher.start()
def run(self):
# 主函数循环
if self.which_task == 'sign':
self.task_sign()
self.task_signal.emit({'sign': True})
elif self.which_task == 'main':
self.task_main()
self.task_signal.emit({'main':True})
elif self.which_task == 'night':
self.task_night()
return True
# except Exception as e:
# self.log.info(f'未知异常!!!程序停止{e}')
def task_sign(self):
shuffle(self.apps_sign)
self.u.d.watcher.add_mylog(self.log)
for app_name in self.apps_sign:
if app_name not in self.today_cash:
try:
self.log.info(f'{app_name} 执行签到')
self._do_work(app_name, '签到')
except TaskException:
self.log.info(f'{app_name} 完成签到')
self.task_signal.emit({'update': True})
continue
except BaseError:
self.log.info(f'没有{app_name},跳过')
continue
else:
self.log.info(f'{app_name} 已签到过,跳过')
else:
self.log.info('执行完签到任务')
self.u.remove_watcher('child')
def task_main(self):
shuffle(self.apps_content)
self.u.creat_watcher('no_2', '禁止')
self.u.creat_watcher('no_3', '拒绝')
self.u.creat_watcher('no_4', '取消')
self.u.d.watcher.add_mylog(log)
for app_name in self.apps_content:
self.log.info(f'{app_name} 执行内容任务')
self.the_working_info['planed_work_time'] = self.apps_time.get(app_name)
self.the_working_info['the_working_app'] = app_name
if self.work_state.get(app_name) == '完成':
self.log.info(f'{app_name}已完成工作')
continue
try:
self.work_state[app_name] = '正在刷'
self.try_func(app_name)
except DoneException:
self.log.info(f'{app_name} 完成工作')
# 重置数据
self.work_state[app_name] = '完成'
continue
except DayException:
self.task_signal.emit({'restart': True})
return True
except NightException:
self.task_signal.emit({'night': True})
return True
else:
self.log.info('执行完所有任务')
raise DayException
def try_func(self, app_name):
try:
# 重置working_task,为定时和周期任务不交叉做辅助。
self.task_signal.emit({'task': 'main'})
self._do_work(app_name, '内容')
except TimedException:
self.log.info('插入时间任务')
try:
self.task_timed()
except SwitchException:
self.try_func(app_name)
except HourException:
self.log.info('插入小时任务')
try:
self.task_hour()
except SwitchException:
self.try_func(app_name)
except PeriodException:
self.log.info('插入周期任务')
try:
self.task_period()
except SwitchException:
self.try_func(app_name)
except TaskException:
self.log.info('可能出现内容直接插入定时任务,重新进入任务循环')
self.try_func(app_name)
def task_period(self):
self.task_signal.emit({'task': 'period'})
for app_name in self.apps_period:
try:
self.log.info(f'{app_name}执行周期任务')
self._do_work(app_name, '周期')
except TaskException:
self.log.info(f'{app_name}完成周期任务')
continue
else:
self.log.info('全部执行完周期任务')
raise SwitchException
def task_timed(self):
self.task_signal.emit({'task': 'timed'})
shuffle(self.apps_timed)
for app_name in self.apps_timed:
try:
self.log.info(f'{app_name} 执行定时任务')
self._do_work(app_name, '定时')
except TaskException:
self.log.info(f'{app_name} 完成定时任务')
continue
else:
self.log.info('全部执行完定时任务')
raise SwitchException
def task_night(self):
for app_name in self.apps_night:
try:
self.log.info(f'{app_name} 执行午夜任务')
self._do_work(app_name, '午夜')
except TaskException:
self.log.info(f'{app_name} 完成午夜任务')
except DoneException:
self.log.info('全部执行完午夜任务')
raise DayException
else:
self.log.info('全部执行完午夜任务')
raise DayException
def task_hour(self):
self.task_signal.emit({'task':'hour'})
for app_name in self.apps_hour:
try:
self.log.info(f'{app_name} 执行小时任务')
self._do_work(app_name, '小时')
except TaskException:
self.log.info(f'{app_name} 完成小时任务')
continue
else:
self.log.info('全部执行完小时任务')
raise SwitchException
def _do_work(self, app_name, task_name):
"""自方法,完成工作程序设置"""
if not (the_worker := self.apps.get(app_name)):
self.log.info(f'没有软件{app_name}')
else:
try:
# 给日志文件设置文件名和日志名,日志名字为app的名字
self.log.set_name('netcash'+self.mobile_device, app_name)
self.log.info(f'运行{app_name}')
# 实例化各自动化程序
wraper.log = self.log
self.task_control[app_name] = task_name
app = the_worker[0](self.u, self.log, the_worker[2])
if app_name not in self.today_cash:
self.today_cash.update({app_name:{}})
app_main = the_worker[1](app, self.task_control, self.today_cash, self.timed_task)
app_main.main()
except BaseError:
self.log.info(f'没有{app_name},跳过')
class AutoMain():
def __init__(self, mobile_device):
self.mobile_device = mobile_device
self.log = log
self.today = int(time.strftime('%Y%m%d'))
self.yesterday = (datetime.date.today()-datetime.timedelta(days=1)).strftime('%Y%m%d')
self.today_cash = {}
self.the_working_info = {'the_working_app': '', 'planed_work_time': 0}
self.work_state = {}
self.worked_time = {}
self.how_long = 0
self.hour_note = 0
self.note_app = {}
self.task_control = {}
self.timed_task = {'awake':0, 'sleep':0, 'dinner':0, 'breakfast':0}
self.working_task = ''
self.task_thread = None
self.app_names = list(APPS.keys())
self.mongo = Mongo('FortuneCat')
self.query = {'date': self.today, 'device': self.mobile_device}
self.night=False
def task_call_back(self, result):
if result.get('sign'):
self.log.info('签到执行完毕')
self.task_thread = self.creat_thread()
self.task_thread.which_task = 'main'
self.task_thread.start()
elif result.get('main'):
self.log.info("全部执行完毕!!!!!")
elif result.get('update'):
# 用来更新现金数据,以记录那个app完成了签到
self.save_cash_info()
elif result.get('restart'):
"""0点过后重新开始"""
self.affirm_cash_info()
self.affrim_worked_time()
self.affrim_timed_task()
self.task_thread = self.creat_thread()
self.task_thread.which_task = 'sign'
self.task_thread.start()
elif result.get('night'):
"""跨夜"""
self.task_thread = self.creat_thread()
self.task_thread.which_task = 'night'
self.task_thread.start()
self.night = True
elif result.get('task'):
# 用来杜绝周期任务与定时任务交织造成混乱
self.working_task = result['task']
def creat_thread(self):
task_thread = TaskThread(self.log, self.task_control, self.today_cash,
self.timed_task, self.mobile_device, self.the_working_info,
self.work_state)
task_thread.task_signal.connect(self.task_call_back)
return task_thread
def start(self):
self.affirm_cash_info()
self.affrim_worked_time()
self.affrim_timed_task()
self.task_thread = self.creat_thread()
self.task_thread.which_task = 'sign'
self.task_thread.start()
timer_save = MyTimer(60, self.save_worked_info, daemon=True)
timer_dispath = MyTimer(3, self.update_count, daemon=True)
timer_save.start()
timer_dispath.start()
register(timer_save.stop)
register(timer_dispath.stop)
def exit_(self):
pass
# result = self.mongo.find_data_list('sys_control', {'date':self.today})
# if result:
# info = result.get(self.mobile_device)
# if info == 'stop':
# self.mongo.update_data('sys_control', {'date':self.today}, {self.mobile_device:'stoped'})
# the_working_app = self.the_working_info['the_working_app']
# self.task_control[the_working_app] = '停止'
# sys.exit()
def save_cash_info(self):
for k, v in deepcopy(self.today_cash).items():
if type(v) == dict:
if v.get('get_cash') == 0:
pre_cash_ = self.mongo.find_data_list('cash',
{'date': self.yesterday, 'device': self.mobile_device})
if pre_cash_:
pre_cash = pre_cash_.get('content',{}).get(k,{}).get('now_cash',0)
get_cash = v.get('now_cash', 0) - pre_cash
if get_cash > 0:
self.today_cash[k]['get_cash'] = round(get_cash, 1)
self.mongo.update_data('cash',self.query, {'content': self.today_cash})
def affirm_cash_info(self):
today_cash_ = self.mongo.find_data_list('cash', self.query)
if today_cash_ and today_cash_.get('content'):
self.today_cash.update(deepcopy(today_cash_.get('content')))
def affrim_worked_time(self):
worked_info = self.mongo.find_data_list('worked_time', self.query)
if worked_info and worked_info.get('content'):
self.worked_time.update(deepcopy(worked_info.get('content')))
def affrim_timed_task(self):
timed_task = self.mongo.find_data_list('timed_task', self.query)
if timed_task and timed_task.get('content'):
self.timed_task.update(deepcopy(timed_task.get('content')))
def save_worked_info(self):
"""保存工作时间到本地"""
self.mongo.update_data('worked_time', self.query, {'content': self.worked_time})
self.mongo.update_data('work_state', self.query, {'content': self.work_state})
def update_count(self):
"""跟新表格和计算工作时间"""
self.exit_()
self.save_worked_info()
the_working_app = self.the_working_info['the_working_app']
planed_work_time = self.the_working_info.get('planed_work_time', 0)
self.how_long += 0.05
self.hour_note += 0.05
worked_time = self.worked_time.get(the_working_app, 0)
self.worked_time[the_working_app] = worked_time + 0.05
if self.how_long > (22 + randint(1, 4)) and self.working_task == 'main':
self.task_control[the_working_app] = '周期'
self.how_long = 0
if self.hour_note > (60 +randint(1, 5)) and self.working_task == 'main':
self.task_control[the_working_app] = '小时'
self.hour_note = 0
if self.worked_time.get(the_working_app, 0) > (planed_work_time*10 + randint(3, 10)):
self.task_control[the_working_app] = '下班'
now_hour = int(time.strftime('%H%M'))
if (2300 > now_hour > (2230 + randint(3, 10)) and not self.timed_task.get('sleep')
and self.working_task == 'main'):
self.task_control[the_working_app] = '定时'
self.timed_task['task'] = 'sleep'
self.timed_task['sleep'] = []
self.mongo.update_data('timed_task', self.query, {'contet': self.timed_task})
elif (1250 >now_hour > (1200 + randint(3, 10)) and not self.timed_task.get('awake')
and self.working_task == 'main'):
self.task_control[the_working_app] = '定时'
self.timed_task['task'] = 'awake'
self.timed_task['awake'] = []
self.mongo.update_data('timed_task', self.query, {'contet': self.timed_task})
elif (1830 >now_hour > (1700 + randint(3, 10)) and not self.timed_task.get('dinner')
and self.working_task == 'main'):
self.task_control[the_working_app] = '定时'
self.timed_task['task'] = 'dinner'
self.timed_task['dinner'] = []
self.mongo.update_data('timed_task', self.query, {'contet': self.timed_task})
elif (800 >now_hour > (830 + randint(3, 10)) and not self.timed_task.get('breakfast')
and self.working_task == 'main'):
self.task_control[the_working_app] = '定时'
self.timed_task['task'] = 'breakfast'
self.timed_task['breakfast'] = []
self.mongo.update_data('timed_task', self.query, {'contet': self.timed_task})
if 2400 >now_hour > 2340 and not self.night:
self.task_control[the_working_app] = '跨夜'
self.log.info('进入跨夜模式')
# if now_hour > 2350:
# self.task_control[the_working_app] == '天'
if __name__ == '__main__':
# pass
args = sys.argv
AutoMain(args[1]).start()
# AutoMain('981b558b').start()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化