加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
upload_and_download.py 22.79 KB
一键复制 编辑 原始数据 按行查看 历史
jczhang02 提交于 2024-09-08 15:22 . feat(version2)

from io import BytesIO
import xlsxwriter
import pandas as pd
from flask import make_response, request, jsonify
import pymysql.cursors
from datetime import datetime
import uuid
import numpy as np
from configparser import ConfigParser
import os
import requests
import logging
# 读取配置文件
conf = ConfigParser()
conf.read("setting.ini", encoding='utf-8')
# 数据库连接
def mysql():
mydb = pymysql.connect(
host=conf.get("mysql_settings", "host"),
port=int(conf.get("mysql_settings", "port")),
user=conf.get("mysql_settings", "user"),
password=conf.get("mysql_settings", "password"),
database='device_info',
cursorclass=pymysql.cursors.DictCursor
)
return mydb
# 设备导入模板
def deviceTemplateDownload():
output = BytesIO()
# 创建Excel文件,不保存,直接输出
workbook = xlsxwriter.Workbook(output, {'in_memory': True})
# 设置Sheet的名字为download
worksheet = workbook.add_worksheet('设备导入')
# 列首
title = ["设备名称", "设备编号", "位置", "摄像头账号", "摄像头密码", "摄像头IP","摄像头端口"]
worksheet.write_row('A1', title)
dictList = [{"设备名称": "必填项,最多一次导入500个设备,填写请删除该提示;", "设备编号": "必填项,填写请删除该提示;",
"位置": "必填项,填写请删除该提示;",
"摄像头账号": "非必填,只有导入摄像头的时候填写该参数;填写请删除该提示;",
"摄像头密码": "非必填,只有导入摄像头的时候填写该参数;填写请删除该提示;",
"摄像头IP": "非必填,只有导入摄像头的时候填写该参数;填写请删除该提示;",
"摄像头端口": "非必填,只有导入摄像头的时候填写该参数;填写请删除该提示;"}]
for i in range(len(dictList)):
row = [dictList[i]["设备名称"], dictList[i]["设备编号"], dictList[i]["位置"], dictList[i]["摄像头账号"], dictList[i]["摄像头密码"],
dictList[i]["摄像头IP"], dictList[i]["摄像头端口"]]
worksheet.write_row('A' + str(i + 2), row)
workbook.close()
response = make_response(output.getvalue())
output.close()
return response
# 设备导入
def deviceUpload():
mydb = mysql()
mydb.connect()
cursor = mydb.cursor()
try:
jsons = request.form
if jsons.__contains__('deviceClassify') and jsons.__contains__('productId') and jsons.__contains__('deviceClass') and jsons.__contains__('deviceType') and jsons.__contains__('is_children'):
# 获取文件流
file = request.files.get("file")
# 获取文件名
fileName = file.filename
fileNameArr = fileName.split(".")
# 校验文件后缀格式
if fileNameArr[1] != "xlsx": return jsonify(code=1, msg="文件后缀格式不正确,应为xlsx;")
# 是否是子设备
is_children = jsons.get("is_children")
# 传输装置编码
trans_device_code = ''
if int(is_children) == 1:
# 传输装置编码
trans_device_code = jsons.get("trans_device_code")
# 系统分类:1 视频设备2 消防设备
deviceClassify = jsons.get("deviceClassify")
# 产品id
productId = jsons.get("productId")
# 设备分类id
deviceClass = jsons.get("deviceClass")
# 设备类型id
deviceType = jsons.get("deviceType")
# 创建日期
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
df = pd.read_excel(file)
# 表格数据
dataMapList = df.to_dict(orient='records')
keyMap = ['设备名称', '设备编号', '位置', '安装人员', '安装人员电话', '摄像头账号', '摄像头密码','摄像头IP', '摄像头端口']
# 校验表头是否和模板一致
for tableHeaderMap in dataMapList:
# 获取表头
tableHeaders = pd.Series(tableHeaderMap).index.values
for tableHeader in tableHeaders:
if tableHeader not in keyMap:
return jsonify(code=1, msg="文件表头不匹配")
break
sql = "SELECT number from device_info"
cursor.execute(sql)
cursor_date = cursor.fetchall()
numberForDataBaseList = []
for num in cursor_date:
numberForDataBaseList.append(num['number'])
# 组装最终插入数据
insertList = []
# 错误数据
errorList = []
# 用于存储导入文档中已有的设备号
numberForImpList = []
for dataMap in dataMapList:
# 设备名称
deviceName = ''
# 设备号
number = ''
# 位置
position = ''
# 安装人员
#install_person = ''
# 安装人员电话
#install_person_phone = ''
# 摄像头账号
camera_account = ''
# 摄像头密码
camera_password = ''
# 摄像头ip
camera_ip = ''
# 摄像头端口
camera_port = ''
# 是否有错误信息
errorFlag = True
# 错误信息
error = ''
for key in dataMap:
if dataMap[key] is None:
dataMap[key] = ''
elif type(dataMap[key]) == float:
if np.isnan(float(dataMap[key])):
dataMap[key] = ''
if key == '设备名称' or key == '设备编号' or key == '位置' or key == '摄像头账号' or key == '摄像头密码' or key == '摄像头IP' or key == '摄像头端口':
if key == '设备名称':
deviceName = dataMap[key]
elif key == '设备编号':
number = dataMap[key]
elif key == '位置':
position = dataMap[key]
# elif key == '安装人员':
# install_person = dataMap[key]
# elif key == '安装人员电话':
# install_person_phone = dataMap[key]
elif key == '摄像头账号':
camera_account = dataMap[key]
elif key == '摄像头密码':
camera_password = dataMap[key]
elif key == '摄像头IP':
camera_ip = dataMap[key]
elif key == '摄像头端口':
camera_port = dataMap[key]
value = dataMap[key]
if value == None or value == '':
if deviceClass != '1001':
if key != '摄像头账号' and key != '摄像头密码' and key != '摄像头IP' and key != '摄像头端口':
errorFlag = False
error = error + key + "不能为空;"
else:
errorFlag = False
error = error + key + "不能为空;"
if errorFlag:
if number in numberForImpList:
errorFlag = False
error = error + "文档中已经导入过此仪表设备号;"
if str(number) in numberForDataBaseList:
errorFlag = False
error = error + "数据库中已经存在此仪表设备号;"
if not errorFlag:
dataMap["错误信息"] = error
errorList.append(dataMap)
errorFlag = True
else:
data = []
data.append(str(uuid.uuid4()))
data.append(deviceName)
data.append(number)
data.append(time)
data.append(position)
# data.append(install_person)
# data.append(install_person_phone)
data.append('1')
data.append(time)
data.append(time)
data.append(deviceType)
data.append(camera_account)
data.append(camera_password)
data.append(camera_ip)
data.append(camera_port)
data.append(deviceClassify)
data.append(productId)
data.append(deviceClass)
data.append(is_children)
data.append(trans_device_code)
insertList.append(data)
if len(insertList) > 0:
sql = "insert into device_info (id,name,number,install_time,position,current_state,create_time,normal_update_time,device_type,camera_account,camera_password,camera_ip,camera_port,device_classify,product_id,device_class,is_children,trans_device_code) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
insert = cursor.executemany(sql, insertList)
mydb.commit()
cursor.close() # 关闭游标
else:
return jsonify(code=1, msg="缺少必要参数deviceClassify、productId、deviceClass、deviceType、is_children")
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
finally:
cursor.close() # 关闭游标
mydb.close() # 再关闭数据库连接
if len(errorList) <= 0:
return jsonify(
msg="success",
code=0
)
else:
response = deviceErrorListDownload(errorList)
response.headers['Content-Type'] = "utf-8"
response.headers["Cache-Control"] = "no-cache"
response.headers["Content-Disposition"] = "attachment; filename=deviceErrorListDownload.xlsx"
return response
# 错误信息下载
def deviceErrorListDownload(dictList):
output = BytesIO()
# 创建Excel文件,不保存,直接输出
workbook = xlsxwriter.Workbook(output, {'in_memory': True})
# 设置Sheet的名字为download
worksheet = workbook.add_worksheet('错误信息')
# 列首
title = ["设备名称", "设备编号", "位置", "安装人员", "安装人员电话", "摄像头账号", "摄像头密码", "摄像头IP",
"摄像头端口", "错误信息"]
worksheet.write_row('A1', title)
for i in range(len(dictList)):
row = [dictList[i]["设备名称"], dictList[i]["设备编号"], dictList[i]["位置"], dictList[i]["安装人员"],
dictList[i]["安装人员电话"], dictList[i]["摄像头账号"], dictList[i]["摄像头密码"],
dictList[i]["摄像头IP"], dictList[i]["摄像头端口"], dictList[i]["错误信息"]]
worksheet.write_row('A' + str(i + 2), row)
workbook.close()
response = make_response(output.getvalue())
output.close()
return response
# 上传文件
def upload():
fname = request.files.get('file') # 获取上传的文件
ip = request.remote_addr
if fname:
time = datetime.now().strftime("%Y%m%d%H%M%S")
new_fname = r'/mnt/work/www/upload/' + time + fname.filename
fname.save(new_fname) # 保存文件到指定路径
return jsonify(
code=0,
msg="success",
data="http://"+ip+"/upload/" + time + fname.filename
)
else:
return jsonify(
code=1,
msg="Server internal error"
)
# 上传许可
def upload_permit():
fname = request.files.get('file') # 获取上传的文件
if fname:
if 'crt' in fname.filename:
new_fname = r'./dev.crt'
fname.save(new_fname) # 保存文件到指定路径
cmd = "openssl verify -CAfile rootCA.crt -partial_chain dev.crt"
result = os.popen(cmd).read()
if result != '' and 'OK' in result:
return jsonify(
code=0,
msg="success",
data="上传许可成功,请重新登录!"
)
else:
return jsonify(
code=1,
msg="许可无效,请重新上传!"
)
else:
return jsonify(
code=1,
msg="上传许可文件格式不正确!"
)
else:
return jsonify(
code=1,
msg="Server internal error"
)
# 上传解析驱动
def upload_analysis():
fname = request.files.get('file') # 获取上传的文件
if fname:
if 'py' in fname.filename:
new_fname = r'./analysis/'+fname.filename
fname.save(new_fname) # 保存文件到指定路径
return jsonify(
code=0,
msg="success"
)
else:
return jsonify(
code=1,
msg="上传解析文件格式不正确!"
)
else:
return jsonify(
code=1,
msg="Server internal error"
)
# 上传配置文件(数据库)
def upload_profile():
fname = request.files.get('file') # 获取上传的文件
if fname:
if 'mysql5.7.tar.gz' in fname.filename:
new_fname = r'/mnt/work/www/upload/'+fname.filename
fname.save(new_fname) # 保存文件到指定路径
cmd = "nsenter -m -n -t 1 sudo -i tar -zxvf /mnt/work/www/upload/mysql5.7.tar.gz -C /"
result = os.popen(cmd).read()
cmd = "nsenter -m -n -t 1 sudo -i docker restart mysql"
result = os.popen(cmd).read()
return jsonify(
code=0,
msg="success"
)
else:
return jsonify(
code=1,
msg="不是导出文件,请用导出的文件导入!"
)
else:
return jsonify(
code=1,
msg="Server internal error"
)
# 下载驱动
def download_drive():
try:
jsons = request.json
url = jsons['url']
my_file = requests.get(url)
file_name = url.split('/')[5]
open('./analysis/'+file_name,'wb').write(my_file.content)
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
return jsonify(
code=0,
msg="success"
)
# 消防部件导入模板
def component_template_download():
output = BytesIO()
# 创建Excel文件,不保存,直接输出
workbook = xlsxwriter.Workbook(output, {'in_memory': True})
# 设置Sheet的名字为download
worksheet = workbook.add_worksheet('消防部件导入')
# 列首
title = ["部件名称", "部件编号", "位置"]
worksheet.write_row('A1', title)
dictList = [{"部件名称": "必填项,最多一次导入500个设备,填写请删除该提示;", "部件编号": "必填项,填写请删除该提示;",
"位置": "必填项,填写请删除该提示;"}]
for i in range(len(dictList)):
row = [dictList[i]["部件名称"], dictList[i]["部件编号"], dictList[i]["位置"]]
worksheet.write_row('A' + str(i + 2), row)
workbook.close()
response = make_response(output.getvalue())
output.close()
return response
# 消防部件导入
def component_upload():
mydb = mysql()
mydb.connect()
cursor = mydb.cursor()
try:
jsons = request.form
if jsons.__contains__('trans_device_code') and jsons.__contains__('part_type') and jsons.__contains__(
'part_type_name'):
# 获取文件流
file = request.files.get("file")
# 获取文件名
fileName = file.filename
fileNameArr = fileName.split(".")
# 校验文件后缀格式
if fileNameArr[1] != "xlsx": return jsonify(code=1, msg="文件后缀格式不正确,应为xlsx;")
# 传输装置编码
trans_device_code = jsons.get("trans_device_code")
# 部件类型id
part_type = jsons.get("part_type")
# 部件类型名称
part_type_name = jsons.get("part_type_name")
# 创建日期
time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
df = pd.read_excel(file)
# 表格数据
dataMapList = df.to_dict(orient='records')
keyMap = ["部件名称", "部件编号", "位置"]
# 校验表头是否和模板一致
for tableHeaderMap in dataMapList:
# 获取表头
tableHeaders = pd.Series(tableHeaderMap).index.values
for tableHeader in tableHeaders:
if tableHeader not in keyMap:
return jsonify(code=1, msg="文件表头不匹配")
break
sql = "SELECT number from fire_fighting_component"
cursor.execute(sql)
cursor_date = cursor.fetchall()
numberForDataBaseList = []
for num in cursor_date:
numberForDataBaseList.append(num['number'])
# 组装最终插入数据
insertList = []
# 错误数据
errorList = []
# 用于存储导入文档中已有的设备号
numberForImpList = []
for dataMap in dataMapList:
# 部件名称
name = ''
# 部件编号
number = ''
# 位置
position = ''
# 是否有错误信息
errorFlag = True
# 错误信息
error = ''
for key in dataMap:
if dataMap[key] is None:
dataMap[key] = ''
elif type(dataMap[key]) == float:
if np.isnan(float(dataMap[key])):
dataMap[key] = ''
if key == '部件名称' or key == '部件编号' or key == '位置':
if key == '部件名称':
name = dataMap[key]
elif key == '部件编号':
number = dataMap[key]
elif key == '位置':
position = dataMap[key]
value = dataMap[key]
if value == None or value == '':
errorFlag = False
error = error + key + "不能为空;"
if errorFlag:
if number in numberForImpList:
errorFlag = False
error = error + "文档中已经导入过此仪表设备号;"
if str(number) in numberForDataBaseList:
errorFlag = False
error = error + "数据库中已经存在此仪表设备号;"
if not errorFlag:
dataMap["错误信息"] = error
errorList.append(dataMap)
else:
data = []
data.append(str(uuid.uuid4()))
data.append(name)
data.append(number)
data.append(position)
data.append('1')
data.append(time)
data.append(time)
data.append(part_type)
data.append(part_type_name)
data.append(trans_device_code)
insertList.append(data)
if len(insertList) > 0:
sql = "insert into fire_fighting_component (id,name,number,position,current_state,create_time,normal_update_time,part_type,part_type_name,trans_device_code) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
insert = cursor.executemany(sql, insertList)
mydb.commit()
cursor.close() # 关闭游标
else:
return jsonify(code=1, msg="缺少必要参数trans_device_code、part_type、part_type_name")
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
finally:
cursor.close() # 关闭游标
mydb.close() # 再关闭数据库连接
if len(errorList) <= 0:
return jsonify(
msg="success",
code=0
)
else:
response = component_error_list_download(errorList)
response.headers['Content-Type'] = "utf-8"
response.headers["Cache-Control"] = "no-cache"
response.headers["Content-Disposition"] = "attachment; filename=component_error_list_download.xlsx"
return response
# 消防部件错误信息下载
def component_error_list_download(dictList):
output = BytesIO()
# 创建Excel文件,不保存,直接输出
workbook = xlsxwriter.Workbook(output, {'in_memory': True})
# 设置Sheet的名字为download
worksheet = workbook.add_worksheet('错误信息')
# 列首
title = ["部件名称", "部件编号", "位置", "错误信息"]
worksheet.write_row('A1', title)
for i in range(len(dictList)):
row = [dictList[i]["部件名称"], dictList[i]["部件编号"], dictList[i]["位置"], dictList[i]["错误信息"]]
worksheet.write_row('A' + str(i + 2), row)
workbook.close()
response = make_response(output.getvalue())
output.close()
return response
# 上传logo
def upload_logo():
fname = request.files.get('file') # 获取上传的文件
ip = request.remote_addr
if fname:
time = datetime.now().strftime("%Y%m%d%H%M%S")
path = r'/mnt/work/www/upload/logo/'
if os.path.exists(path):
new_fname = r'/mnt/work/www/upload/logo/' + time + fname.filename
fname.save(new_fname) # 保存文件到指定路径
else:
os.mkdir(path)
new_fname = r'/mnt/work/www/upload/logo/' + time + fname.filename
fname.save(new_fname) # 保存文件到指定路径
conf.set("gateway_box",'logo',"/upload/logo/" + time + fname.filename)
with open('setting.ini', 'w') as f:
conf.write(f)
conf.read('setting.ini')
return jsonify(
code=0,
msg="success",
data="/upload/logo/" + time + fname.filename
)
else:
return jsonify(
code=1,
msg="Server internal error"
)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化