代码拉取完成,页面将自动刷新
from kafka import KafkaConsumer
import pymysql
import time
import traceback
# -------- 1.连接MySQL -------------
def get_conn():
conn = pymysql.connect(host="***", # 本地ip或你的域名地址
user="***", # MySQL登录用户名
password="***", # MySQL登录密码
db="etc", # 储存数据的数据库名
charset="utf8")
# conn表示连接数据库,对数据的操作需要通过cursor来实现
cursor = conn.cursor()
return conn, cursor
def close_conn(conn, cursor):
if cursor:
cursor.close()
if conn:
conn.close()
# -------- 2.数据存入MySQL -------------
def update_data(result,num):
cursor = None
conn = None
try:
conn, cursor = get_conn()
# 插入数据
sql_insert = "insert into etc_all(XH, CP, CX1, CK, CKSJ, RK,RKSJ,CX2)" \
"values(%s,%s,%s,%s,%s,%s,%s,%s)"
# 设置“判断是否存在” 的SQL语句,不存在数据更新情况,所以如果有重复数据就选择不插入了
sql_ifExist="select XH from etc_all where XH=%s"
if not cursor.execute(sql_ifExist,num):
cursor.execute(sql_insert, result)
conn.commit()
print('%s数据更新完成!' % time.strftime("%Y-%m-%d %H:%M:%S"))
except:
traceback.print_exc()
finally:
close_conn(conn, cursor)
# -------- 3.连接kafka -------------
# 将插入数据的函数调用,放在获取消息的循环中即可
def Kafka2Mysql():
consumer = KafkaConsumer('etc_all',bootstrap_servers=['ip'],auto_offset_reset='earliest')
for m in consumer:
msg = m.value.decode('utf-8')
print(msg)
# 【output】
# 1,粤S88***,一型车(客),广东水朗D站,2020/12/22 0:00,广东龙景站,2020/12/21 23:39,深圳入
# -- 对每条消息进行处理 --
msg = msg.split(",") # 将接收的消息转换为列表类型
msg.pop(7) # 删除“深圳人”
msg[1] = msg[1][0] # 车牌只保留省份
msg.append(msg[2][4]) # 车类型(客/货)添加到列表尾部作为CX1属性列
msg[2] = msg[2][:3] # CX2属性列只保留"*型车"
print(msg)
# 【output】
# ['1', '粤', '一型车', '广东水朗D站', '2020/12/22 0:00', '广东龙景站', '2020/12/21 23:39', '客']
# -- 向MySQL插入数据--
update_data(msg, msg[0])
consumer.close()
if __name__ == '__main__':
Kafka2Mysql()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。