加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
Kafka2MySQL.py 2.60 KB
一键复制 编辑 原始数据 按行查看 历史
钰小霖 提交于 2022-02-14 11:42 . 创建
from kafka import KafkaConsumer
import pymysql
import time
import traceback
# -------- 1.连接MySQL -------------
def get_conn():
conn = pymysql.connect(host="***", # 本地ip或你的域名地址
user="***", # MySQL登录用户名
password="***", # MySQL登录密码
db="etc", # 储存数据的数据库名
charset="utf8")
# conn表示连接数据库,对数据的操作需要通过cursor来实现
cursor = conn.cursor()
return conn, cursor
def close_conn(conn, cursor):
if cursor:
cursor.close()
if conn:
conn.close()
# -------- 2.数据存入MySQL -------------
def update_data(result,num):
cursor = None
conn = None
try:
conn, cursor = get_conn()
# 插入数据
sql_insert = "insert into etc_all(XH, CP, CX1, CK, CKSJ, RK,RKSJ,CX2)" \
"values(%s,%s,%s,%s,%s,%s,%s,%s)"
# 设置“判断是否存在” 的SQL语句,不存在数据更新情况,所以如果有重复数据就选择不插入了
sql_ifExist="select XH from etc_all where XH=%s"
if not cursor.execute(sql_ifExist,num):
cursor.execute(sql_insert, result)
conn.commit()
print('%s数据更新完成!' % time.strftime("%Y-%m-%d %H:%M:%S"))
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)
# -------- 3.连接kafka -------------
# 将插入数据的函数调用,放在获取消息的循环中即可
def Kafka2Mysql():
consumer = KafkaConsumer('etc_all',bootstrap_servers=['ip'],auto_offset_reset='earliest')
for m in consumer:
msg = m.value.decode('utf-8')
print(msg)
# 【output】
# 1,粤S88***,一型车(客),广东水朗D站,2020/12/22 0:00,广东龙景站,2020/12/21 23:39,深圳入
# -- 对每条消息进行处理 --
msg = msg.split(",") # 将接收的消息转换为列表类型
msg.pop(7) # 删除“深圳人”
msg[1] = msg[1][0] # 车牌只保留省份
msg.append(msg[2][4]) # 车类型(客/货)添加到列表尾部作为CX1属性列
msg[2] = msg[2][:3] # CX2属性列只保留"*型车"
print(msg)
# 【output】
# ['1', '粤', '一型车', '广东水朗D站', '2020/12/22 0:00', '广东龙景站', '2020/12/21 23:39', '客']
# -- 向MySQL插入数据--
update_data(msg, msg[0])
consumer.close()
if __name__ == '__main__':
Kafka2Mysql()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化