加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
terminal_goals_task.py 3.15 KB
一键复制 编辑 原始数据 按行查看 历史
yun_abc 提交于 2022-05-13 16:15 . update terminal_goals_task.py.
#-*- coding: utf8 -*-
#@time : 2018-02-07
#@author : wu gao wen
#@email : yun2280@foxmal.com
import pandas as pd
import numpy as np
import pymysql
from apscheduler.schedulers.blocking import BlockingScheduler
host = 'your host'
user = 'your user'
passwd = 'your password'
db = 'your database'
con = pymysql.connect(host=host, user=user, passwd=passwd, db=db, port = 3306)
cur = con.cursor()
# 注SQL字段与内容要与实际相符,按销售流水日期排序
sql_sales_lists = " select terminal_id, sales, lastchanged "\
" where sales_date>='2022-01-01' order by lastchanged asc "\
# 查询消费明细
def read_table(cur,sql_sales_lists):
try:
cur.execute(sql_sales_lists) #
data = cur.fetchall()
frame = pd.DataFrame(list(data))
frame.columns = ['terminal_id','sales','lastchanged']
#print frame
except: #, e:
frame = pd.DataFrame()
# print e
# continue
return frame
con.commit()
cur.close()
con.close()
sql_goals = " select terminal_id ,goals from terminal_goals "
# 业绩指标
def read_goals(cur,sql_goals):
try:
cur.execute(sql_goals)
data = cur.fetchall()
frame = pd.DataFrame(list(data))
frame.columns = ['terminal_id','goals']
#print frame
except: #, e:
frame = pd.DataFrame()
# continue
return frame
con.commit()
cur.close()
con.close()
# dataframe
df = read_table(cur,sql_sales_lists)
# 分组累计
df['cumsum'] = df.groupby('terminal_id')['sales'].transform(pd.Series.cumsum)
df_goals = read_goals(cur,sql_goals)
#print df_goals
# update更新达到目标方法
def update_sql(cur,sql_update):
try:
cur.execute(sql_update)
except Exception as e: #, e:
con.rollback()
print ('fail',e)
else:
con.commit()
print ('update ok')
#cur.close()
#con.close()
# 查找实际业绩超过指标第一个值,break,并记录更新至 table_pk
def find_complete_goals(terminal_id,goals):
# 过滤数据
df_filter = df[(df['terminal_id']==terminal_id) & (df['cumsum']>=goals)]
#print df_filter
for index2 in df_filter.index :
if ((df_filter.loc[index2].ix['terminal_id']==terminal_id)&(df_filter.loc[index2].ix['cumsum']>=goals)) == True :
complete_date = str(df_filter.loc[index2].ix['lastchanged'])
terminal_id = str(terminal_id)
complete_sql = "update table_pk set complete_date = '"+complete_date+"' where terminal_id = '"+terminal_id+"'"
update_sql(cur,complete_sql)
break
else:
pass
def find_complete_date():
for indexs in find_complete_goals.index :
terminal_goals = (find_complete_goals.loc[indexs].values[0: 2])
find_complete_goals(terminal_goals[0],terminal_goals[1])
# 调度作业
#find_complete_date()
def my_job():
find_complete_date()
# 开启作业 间隔10分钟执行一次任务
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', minutes=10)
sched.start()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化