加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
car_plate_info.py 6.99 KB
一键复制 编辑 原始数据 按行查看 历史
jczhang02 提交于 2024-09-08 15:22 . feat(version2)
import pymysql.cursors
import logging
import uuid
from datetime import datetime
import json
from flask import request,jsonify
import system_setup
import requests
from configparser import ConfigParser
from hk.gate_control import cate_control_auto
# 读取配置文件
conf = ConfigParser()
conf.read("setting.ini",encoding='utf-8')
# log日志
logging.basicConfig(level = logging.DEBUG,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 数据库连接
def mysql():
mydb = pymysql.connect(
host=conf.get("mysql_settings","host"),
port=int(conf.get("mysql_settings","port")),
user=conf.get("mysql_settings","user"),
password=conf.get("mysql_settings","password"),
database='device_info',
cursorclass=pymysql.cursors.DictCursor
)
return mydb
# 车牌新增、更新
def car_plate_add_update():
response = system_setup.getDecodeToken(request.headers.get("Authorization","")[7:])
token = json.loads(response.data)
if token['code'] == 1:
return jsonify(
msg=token['msg'],
code=1
)
elif token['code'] == 401:
return jsonify(
msg=token['msg'],
code=token['code']
)
mydb = mysql()
mydb.connect()
cursor = mydb.cursor()
try:
jsons = request.json
car_plate = jsons['car_plate']
#创建日期
create_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
id = ''
if jsons.__contains__('id'):
id = jsons['id']
if id is None or id == '':return jsonify(code=1,msg="id不能为空!")
sql = "SELECT car_plate FROM car_plate_info WHERE id = '" + id + "'"
cursor.execute(sql)
old_car_plate = cursor.fetchone()
if car_plate != old_car_plate['car_plate']:
sql = "SELECT car_plate FROM car_plate_info WHERE car_plate = '" + car_plate + "'"
cursor.execute(sql)
count = cursor.fetchall()
if count.__len__() >0:
return jsonify(code=1,msg="车牌已存在")
sql = "DELETE FROM car_plate_info WHERE id = '" + id + "'"
cursor.execute(sql)
mydb.commit()
else:
id = str(uuid.uuid4())
sql = "SELECT car_plate FROM car_plate_info WHERE car_plate = '" + car_plate + "'"
cursor.execute(sql)
count = cursor.fetchall()
if count.__len__() >0:
return jsonify(code=1,msg="车牌已存在")
sql = "insert into car_plate_info (id,car_plate,create_time) values('%s','%s','%s')" %(id,car_plate,create_time)
insert = cursor.execute(sql)
mydb.commit()
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
finally:
cursor.close() # 关闭游标
mydb.close() # 关闭数据库
return jsonify(
msg="success",
code=0
)
# 车牌分页列表
def car_plate_list_page():
response = system_setup.getDecodeToken(request.headers.get("Authorization","")[7:])
token = json.loads(response.data)
if token['code'] == 1:
return jsonify(
msg=token['msg'],
code=1
)
elif token['code'] == 401:
return jsonify(
msg=token['msg'],
code=token['code']
)
mydb = mysql()
mydb.connect()
cursor = mydb.cursor()
try:
jsons = request.args
if jsons.__contains__('page') and jsons.__contains__('limit'):
page = jsons.get("page")
limit = jsons.get("limit")
car_plate = "AND car_plate like '%" + jsons.get("car_plate") + "%'" if jsons.get("car_plate") is not None else ""
sql = "SELECT * from car_plate_info where 1=1 " + car_plate + " order by create_time desc"
cursor.execute(sql)
cur_data = cursor.fetchall()
items = []
for item in cur_data[int(limit)*(int(page)-1):int(limit)*int(page)]:
temp = {}
temp['id'] = item['id']
temp['car_plate'] = item['car_plate']
temp['create_time'] = item['create_time']
items.append(temp)
rlt_data = {}
num = len(cur_data)
rlt_data["totalSize"] = num
rlt_data["pages"] = num // int(limit) + 1
rlt_data["pageSize"] = limit
rlt_data["Items"] = items
rlt_data["currentPage"] = page
rlt_data["currentSize"] = limit
if int(page)==rlt_data["pages"]:
rlt_data["currentSize"] = num % int(limit)
else:
return jsonify(code=1,msg="缺少必要参数page、limit")
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
finally:
cursor.close() # 关闭游标
mydb.close() # 关闭连接
return jsonify(
msg="success",
code=0,
data=rlt_data
)
# 车牌删除
def car_plate_del():
response = system_setup.getDecodeToken(request.headers.get("Authorization","")[7:])
token = json.loads(response.data)
if token['code'] == 1:
return jsonify(
msg=token['msg'],
code=1
)
elif token['code'] == 401:
return jsonify(
msg=token['msg'],
code=token['code']
)
mydb = mysql()
mydb.connect()
cursor = mydb.cursor()
try:
jsons = request.json
if jsons.__contains__('id'):
if jsons['id'] is None or jsons['id'] == '':return jsonify(code=1,msg="id不能为空!")
sql = "DELETE FROM car_plate_info WHERE id = '" + jsons['id'] + "'"
cursor.execute(sql)
mydb.commit()
else:
return jsonify(code=1,msg="缺少必要参数id")
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
finally:
cursor.close() # 关闭游标
mydb.close() # 再关闭数据库连接
return jsonify(
msg="success",
code=0
)
# 车牌识别
def car_plate_distinguish(jsons):
mydb = mysql()
mydb.connect()
cursor = mydb.cursor()
try:
car_plate = jsons['sLicense']
car_plate = car_plate[1:]
number = jsons['number']
sql = "SELECT * from car_plate_info where car_plate = '" + car_plate + "'"
cursor.execute(sql)
carPlate = cursor.fetchone()
if carPlate is None: return "车牌不存在!"
else:
sql = "SELECT * from device_info where number = '" + number + "'"
cursor.execute(sql)
device_info = cursor.fetchone()
cate_control_auto(device_info['camera_ip'],device_info['camera_account'],device_info['camera_password'])
except Exception as e:
print(e)
finally:
cursor.close() # 关闭游标
mydb.close() # 再关闭数据库连接
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化