代码拉取完成,页面将自动刷新
# -*- coding: utf-8 -*-
import json
import redis
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import pymysql
from sqlalchemy import Column, Integer, String, DateTime, Boolean, text, JSON
from apscheduler.schedulers.blocking import BlockingScheduler
""" 定时任务,redis备份到数据库 """
db_config = { # 一汽丰田正式——外网
'host': 'bj-cdb-pr2xxnfm.sql.tencentcdb.com',
'port': 59584,
'user': 'root',
'password': 'Ftms@2024',
'database': 'toyota_yi_feng',
}
# db_config = { # 一汽丰田正式——内网
# 'host': '172.16.0.9',
# 'port': 3306,
# 'user': 'root',
# 'password': 'Ftms@2024',
# 'database': 'toyota_yi_feng',
# }
redis_config = { # 外网地址
'host': 'bj-crs-dbsuxig7.sql.tencentcdb.com',
'port': 22237,
'username': 'root',
'password': 'Ftms@2024',
}
# redis_config = { # 内网地址
# 'host': '172.16.0.3',
# 'port': 6379,
# 'username': 'root',
# 'password': 'Ftms@2024',
# }
engine = create_engine(
'mysql+pymysql://',
connect_args=db_config,
pool_size=1, # 最大连接数
pool_pre_ping=False, # 每次连接被取出时都检查连接是否有效
)
def connect_redis(db_num):
redis_db = redis.Redis(
host=redis_config['host'],
port=redis_config['port'],
password=redis_config['password'],
username=redis_config['username'],
db=int(db_num)
)
return redis_db
# 获取json
def redis_json_get(num, key):
redis_db = connect_redis(num)
res = redis_db.get(key)
if res is None:
return None
else:
return json.loads(res)
Base = declarative_base()
def submit_clue(table_name):
class DynamicModel(Base):
__tablename__ = table_name
__table_args__ = {'extend_existing': True}
id = Column(Integer, primary_key=True, autoincrement=True)
openid = Column(String(255), unique=True, index=True)
phone = Column(String(255), nullable=True)
name = Column(String(255), nullable=True)
dealerId = Column(String(255), nullable=True)
seriesId = Column(Integer, nullable=True)
create_time = Column(DateTime, default=datetime.now)
return DynamicModel
def prize_result(table_name):
class DynamicModel(Base):
__tablename__ = table_name
__table_args__ = {'extend_existing': True}
id = Column(Integer, primary_key=True, autoincrement=True, comment='主键id')
openid = Column(String(255), nullable=False, comment='用户openid')
phone = Column(String(255), nullable=True, comment='手机号')
dealerCode = Column(String(255), nullable=True, comment='经销商id')
prize_name = Column(String(255), nullable=True, comment='奖品名称')
check_code = Column(String(255))
state = Column(Integer, nullable=False, comment='1中奖0未中奖2答题')
vx_card = Column(JSON, server_default=text("'{}'"), comment='json数据')
create_time = Column(DateTime, nullable=False, comment='创建时间')
return DynamicModel
# 执行原生 SQL
def native_sql(sql):
connection = pymysql.connect(**db_config) # 数据库连接【一般为公共,但本方法使用少】
cursor = connection.cursor() # 创建游标对象
try:
res = cursor.execute(sql) # 执行语句
connection.commit() # 提交事务
return res
except Exception as e:
print("Error:", e)
connection.rollback() # 回滚事务
finally:
cursor.close() # 关闭游标
connection.close() # 关闭连接
# 复制表结构
def copy_structure(new_table, old_table):
sql = f"CREATE TABLE {new_table} LIKE {old_table}"
print('复制表结构', sql)
res = native_sql(sql) # 0执行成功
return res
def sell_clue(redis_num):
Session = sessionmaker(bind=engine)
session = Session()
cn = connect_redis(redis_num)
keys = cn.keys()
date_suffix = datetime.now().strftime("%m%d_%H")
res1 = copy_structure('frontapp_submit_clue_' + str(date_suffix), 'frontapp_submit_clue')
print('已经开始同步线索提交' + str(res1))
for key in keys:
try:
dic = redis_json_get(redis_num, key)
model = submit_clue('frontapp_submit_clue_' + str(date_suffix))
new_obj = model(openid=key.decode(), phone=dic['phone'], name=dic['name'],
dealerId=dic['dealerId'], seriesId=dic['seriesId'], create_time=dic['create_time'])
session.add(new_obj)
session.commit()
except Exception as e:
print(f"error {key}: {e}")
continue
def go_prize_result(redis_num):
Session = sessionmaker(bind=engine)
session = Session()
cn = connect_redis(redis_num)
keys = cn.keys()
date_suffix = datetime.now().strftime("%m%d_%H")
res1 = copy_structure('frontapp_prize_result_' + str(date_suffix), 'frontapp_prize_result')
print('已经开始同步线索提交' + str(res1))
for key in keys:
try:
dic = redis_json_get(redis_num, key)
model = prize_result('frontapp_prize_result_' + str(date_suffix))
new_obj = model(openid=key.decode(), phone=dic['phone'], dealerCode=dic['dealerCode'],
prize_name=dic['prize_name'], check_code=dic['check_code'], state=dic['state'],
vx_card=dic['vx_card'], create_time=dic['create_time'])
session.add(new_obj)
session.commit()
except Exception as e:
print(f"error {key}: {e}")
continue
def job():
sell_clue(7)
sell_clue(8)
sell_clue(9)
sell_clue(10)
go_prize_result(14)
go_prize_result(15)
go_prize_result(16)
go_prize_result(17)
go_prize_result(18)
go_prize_result(19)
# 每天定时任务
def timed_task(hour, minute, def_name):
s = BlockingScheduler()
print('>>>定时任务已启动')
s.add_job(def_name, 'cron', hour=hour, minute=minute)
s.start()
if __name__ == '__main__':
timed_task(1, 1, job) # 每天凌晨1点开始
# nohup python3 redis_backup_db.py >/dev/null 2>&1 &
# nohup python3 redis_backup_db.py > ~/log_service/redis_backup_db.log 2> ~/log_service/redis_backup_db.log &
# redis需要备份
# 7 销售线索1
# 8 销售线索2
# 9 销售线索3
# 10 销售线索4
# 14 抽奖结果1--012
# 15 抽奖结果2--345
# 16 抽奖结果3--678
# 17 抽奖结果4--9AB
# 18 抽奖结果5--CD
# 19 抽奖结果6--EF
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。