代码拉取完成,页面将自动刷新
import logging
from flask import request, jsonify
import uuid
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
import pymysql.cursors
import time
import jwt
import requests
import platform
import json
from configparser import ConfigParser
import OpenSSL.crypto
from pynvml import *
# 读取配置文件
conf = ConfigParser()
conf.read("setting.ini", encoding='utf-8')
# log日志
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 数据库连接
def mysql():
mydb = pymysql.connect(
host=conf.get("mysql_settings", "host"),
port=int(conf.get("mysql_settings", "port")),
user=conf.get("mysql_settings", "user"),
password=conf.get("mysql_settings", "password"),
database='system_setup',
cursorclass=pymysql.cursors.DictCursor
)
return mydb
# 内存
# redis = redis.Redis(host='127.0.0.1', port=6379,db=0,password='jizhi@123')
# 用户新增
def userAdd():
response = getDecodeToken(request.headers.get("Authorization", "")[7:])
token = json.loads(response.data)
if token['code'] == 1:
return jsonify(
msg=token['msg'],
code=1
)
elif token['code'] == 401:
return jsonify(
msg=token['msg'],
code=token['code']
)
mydb = mysql()
mydb.connect()
cursor = mydb.cursor()
try:
jsons = request.json
if jsons.__contains__('userName') and jsons.__contains__('password') and jsons.__contains__('state'):
sql = "SELECT user_name FROM `user` WHERE user_name = '" + jsons['userName'] + "'"
cursor.execute(sql)
count = cursor.fetchall()
if count.__len__() > 0:
return jsonify(code=1, msg="用户已存在")
# 用户名
user_name = jsons['userName']
if user_name is None: return jsonify(code=1, msg="userName 不能为null")
# 密码
password = jsons['password']
if password == '': return jsonify(code=1, msg="password 不能为null")
# 密码加密
password = generate_password_hash(password, method='pbkdf2:sha1', salt_length=10)
# 状态 1正常 2禁用
state = jsons['state']
if state is None or '': return jsonify(code=1, msg="state 不能为null")
# 创建日期
creation_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 备注
remark = ''
if jsons.__contains__('remark'):
remark = jsons['remark']
id = str(uuid.uuid4())
sql = "insert into user (id,user_name,state,password,creation_date,remark) values('%s','%s',%s,'%s','%s','%s')" % (
id, user_name, state, password, creation_date, remark)
insert = cursor.execute(sql)
mydb.commit()
else:
return jsonify(code=1, msg="缺少必要参数userName、password、state")
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
finally:
cursor.close() # 关闭游标
mydb.close() # 关闭连接
return jsonify(
msg="success",
code=0
)
# 用户编辑
def userUpdate():
response = getDecodeToken(request.headers.get("Authorization", "")[7:])
token = json.loads(response.data)
if token['code'] == 1:
return jsonify(
msg=token['msg'],
code=1
)
elif token['code'] == 401:
return jsonify(
msg=token['msg'],
code=token['code']
)
mydb = mysql()
mydb.connect()
cursor = mydb.cursor()
try:
jsons = request.json
if jsons.__contains__('userName') and jsons.__contains__('password') and jsons.__contains__(
'state') and jsons.__contains__('id'):
sql = "SELECT user_name FROM `user` WHERE id = '" + jsons['id'] + "'"
cursor.execute(sql)
userName = cursor.fetchone()
if jsons['userName'] != userName["user_name"]:
sql = "SELECT user_name FROM `user` WHERE user_name = '" + jsons['userName'] + "'"
cursor.execute(sql)
count = cursor.fetchall()
if count.__len__() > 0:
return jsonify(code=1, msg="用户已存在")
# 用户名
user_name = jsons['userName']
if user_name is None: return jsonify(code=1, msg="userName 不能为null")
# 密码
password = jsons['password']
if password == '': return jsonify(code=1, msg="password 不能为null")
# 密码加密
password = generate_password_hash(password, method='pbkdf2:sha1', salt_length=10)
# 状态 1正常 2禁用
state = jsons['state']
if state is None or '': return jsonify(code=1, msg="state 不能为null")
# 创建日期
creation_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 备注
# 备注
remark = ''
if jsons.__contains__('remark'):
remark = jsons['remark']
id = jsons['id']
sql = "UPDATE `user` SET user_name = '%s',state = %s ,password = '%s',creation_date = '%s',remark = '%s' where id = '%s'" % (
user_name, state, password, creation_date, remark, id)
cursor.execute(sql)
mydb.commit()
else:
return jsonify(code=1, msg="缺少必要参数id、userName、password、state")
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
finally:
cursor.close() # 关闭游标
mydb.close() # 再关闭数据库连接
return jsonify(
msg="success",
code=0
)
# 用户删除
def userDel():
response = getDecodeToken(request.headers.get("Authorization", "")[7:])
token = json.loads(response.data)
if token['code'] == 1:
return jsonify(
msg=token['msg'],
code=1
)
elif token['code'] == 401:
return jsonify(
msg=token['msg'],
code=token['code']
)
mydb = mysql()
mydb.connect()
cursor = mydb.cursor()
try:
jsons = request.json
if jsons.__contains__('id'):
sql = "DELETE FROM `user` WHERE id = '" + jsons['id'] + "'"
cursor.execute(sql)
mydb.commit()
else:
return jsonify(code=1, msg="缺少必要参数id")
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
finally:
cursor.close() # 关闭游标
mydb.close() # 再关闭数据库连接
return jsonify(
msg="success",
code=0
)
# 用户分页列表
def userListPage():
response = getDecodeToken(request.headers.get("Authorization", "")[7:])
token = json.loads(response.data)
if token['code'] == 1:
return jsonify(
msg=token['msg'],
code=1
)
elif token['code'] == 401:
return jsonify(
msg=token['msg'],
code=token['code']
)
mydb = mysql()
mydb.connect()
cursor = mydb.cursor()
try:
jsons = request.args
if jsons.__contains__('page') and jsons.__contains__('limit'):
page = jsons.get("page")
limit = jsons.get("limit")
userName = "AND user_name like '%" + jsons.get("userName") + "%'" if jsons.get(
"userName") is not None else ""
state = " AND state = '" + jsons.get("state") + "'" if jsons.get("state") is not None else ""
sql = "SELECT * from user where 1=1 " + userName + state + " order by creation_date desc"
cursor.execute(sql)
cur_data = cursor.fetchall()
items = []
for item in cur_data[int(limit) * (int(page) - 1):int(limit) * int(page)]:
temp = {}
temp['id'] = item['id']
temp['userName'] = item['user_name']
temp['state'] = item['state']
temp['creationDate'] = item['creation_date']
temp['remark'] = item['remark']
items.append(temp)
rlt_data = {}
num = len(cur_data)
rlt_data["totalSize"] = num
rlt_data["pages"] = num // int(limit) + 1
rlt_data["pageSize"] = limit
rlt_data["Items"] = items
rlt_data["currentPage"] = page
rlt_data["currentSize"] = limit
if int(page) == rlt_data["pages"]:
rlt_data["currentSize"] = num % int(limit)
else:
return jsonify(code=1, msg="缺少必要参数page、limit")
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
finally:
cursor.close() # 关闭游标
mydb.close() # 关闭数据库
return jsonify(
msg="success",
code=0,
data=rlt_data
)
# 登录
def login():
mydb = mysql()
mydb.connect()
cursor = mydb.cursor()
try:
jsons = request.json
if jsons.__contains__('userName') and jsons.__contains__('password'):
sql = "SELECT * from `user` WHERE user_name = '" + jsons.get("userName") + "'"
cursor.execute(sql)
user = cursor.fetchone()
if user is None: return jsonify(code=1, msg="用户不存在")
if check_password_hash(user["password"], jsons.get("password")):
print("登录成功")
rlt_data = {}
#rlt_data["token"] = createToken(user).decode(encoding ='utf-8',errors = 'strict')
rlt_data["token"] = createToken(user)
# 校验许可
flag = permit_check()
if flag == 'yes':
rlt_data["permit_check"] = 'yes'
else:
rlt_data["permit_check"] = 'no'
permission = license_permission_analysis()
rlt_data["permission"] = permission
return jsonify(
msg="success",
code=0,
data=rlt_data
)
else:
return jsonify(code=1, msg="密码错误")
else:
return jsonify(code=1, msg="缺少必要参数userName,password")
except Exception as e:
logging.debug("---Exception---%s" % e)
return jsonify(
code=1,
msg="Server internal error"
)
finally:
cursor.close() # 关闭游标
mydb.close() # 再关闭数据库连接
return jsonify(
code=0,
msg="success"
)
# 许可校验
def permit_check():
# cmd = "openssl verify -CAfile rootCA.crt -partial_chain dev.crt"
# result = os.popen(cmd).read()
# if result != '' and 'OK' in result:
# logging.debug("---进来了---")
# cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open("dev.crt").read())
# certSubject = cert.get_subject()
# for item in certSubject.get_components():
# if item[0].decode("utf-8") == 'CN':
# logging.debug("---item---")
# logging.debug(item)
# logging.debug("---item[0]---")
# board_info = "{"+str(main_board_info())+"}.web"
# logging.debug("---board_info---")
# logging.debug(board_info)
# item[1].decode("utf-8")
# if item[1].decode("utf-8") != board_info:
# logging.debug("no")
# return 'no'
# logging.debug("yes")
# return 'yes'
# else:
# logging.debug("no")
# return 'no'
return "yes"
# 许可权限解析
def license_permission_analysis():
# cmd = "openssl verify -CAfile rootCA.crt -partial_chain dev.crt"
# result = os.popen(cmd).read()
# if result != '' and 'OK' in result:
# logging.debug("---进来了---")
# cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open("dev.crt").read())
# certSubject = cert.get_subject()
# for item in certSubject.get_components():
# if item[0].decode("utf-8") == 'OU':
# logging.debug("---item---")
# logging.debug(item)
# ou = item[1].decode("utf-8")
# return ou[1:len(ou) - 5]
# return 'yes'
# else:
# logging.debug("no")
# return 'no'
return "yes"
# 创建token
def createToken(user):
# 生成一个字典,包含我们的具体信息
info = {
# 公共声明
'exp': time.time() + 7200, # (Expiration Time) 此token的过期时间的时间戳
# 'iat':time.time(), # (Issued At) 指明此创建时间的时间戳
'iss': 'jizhi', # (Issuer) 指明此token的签发者
'userId': user["id"]
# 私有声明
# 'data':{
# 'username':'xjj',
# 'timestamp':time.time()
# }
}
# jwt_encode = redis.get(user.get('user_name'))
# if jwt_encode is None:
#
# redis.set(user.get('user_name'),jwt_encode)
jwt_encode = jwt.encode(info, 'jizhi@123', algorithm='HS256')
return jwt_encode
# 解析token
def getDecodeToken(token):
if token == '':
return jsonify(
msg="No Token",
code=1,
)
try:
# flag = permit_check()
# if flag == 'no':
# return jsonify(
# code=401,
# msg='许可已过期'
# )
jwt_decode = jwt.decode(token, 'jizhi@123', issuer='jizhi', algorithms=['HS256'])
print(jwt_decode)
return jsonify(
msg="success",
code=0,
data=jwt_decode
)
except Exception as e:
return jsonify(
code=401,
msg='token 已过期'
)
def token_check(token):
headers = {'Content-Type': 'application/json', 'Authorization': 'Bearer ' + token}
url = "http://127.0.0.1:5001/system/getDecodeToken"
response = requests.request("POST", url, headers=headers).text
response = json.loads(response)
return response
# 获取网络信息
def ethList():
response = getDecodeToken(request.headers.get("Authorization", "")[7:])
token = json.loads(response.data)
if token['code'] == 1:
return jsonify(
msg=token['msg'],
code=1
)
elif token['code'] == 401:
return jsonify(
msg=token['msg'],
code=token['code']
)
try:
url = "http://127.0.0.1:8001/rest/v1/dev/ethList"
responseInfo = requests.get(url, timeout=5).text
responseInfo = json.loads(responseInfo)
logging.debug("response:%s" % responseInfo)
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
return jsonify(
msg="success",
code=0,
data=responseInfo['data']
)
# 配置网络信息
def updateEth():
response = getDecodeToken(request.headers.get("Authorization", "")[7:])
token = json.loads(response.data)
if token['code'] == 1:
return jsonify(
msg=token['msg'],
code=1
)
elif token['code'] == 401:
return jsonify(
msg=token['msg'],
code=token['code']
)
try:
jsons = request.json
if jsons.__contains__('eth_name') and jsons.__contains__('gateway') and jsons.__contains__(
'dns') and jsons.__contains__('ip_addr') and jsons.__contains__('mask'):
headers = {
"Content-Type": "application/json; charset=UTF-8"
}
url = "http://127.0.0.1:8001/rest/v1/dev/ethList"
responseInfo = requests.post(url, data=json.dumps(jsons), headers=headers, timeout=5).text
responseInfo = json.loads(responseInfo)
logging.debug("response:%s" % responseInfo)
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
return jsonify(
msg="success",
code=0,
data=responseInfo['data']
)
# 获取系统信息
def systemInfo():
response = getDecodeToken(request.headers.get("Authorization", "")[7:])
token = json.loads(response.data)
if token['code'] == 1:
return jsonify(
msg=token['msg'],
code=1
)
elif token['code'] == 401:
return jsonify(
msg=token['msg'],
code=token['code']
)
try:
conf.read('setting.ini')
gatewayNumber = conf.get("gateway_box", "gateway_number")
gatewayName = conf.get("gateway_box", "gateway_name")
gatewayType = conf.get("gateway_box", "gateway_type")
gatewaySerialNumber = conf.get("gateway_box", "gateway_serialnumber")
gatewayEdition = conf.get("gateway_box", "gateway_edition")
# gatewayPermit = conf.get("gateway_box", "gateway_permit")
gatewayPermit = main_board_info()
logo = conf.get("gateway_box", "logo")
jsonParam = {"gatewayNumber": gatewayNumber, "gatewayName": gatewayName, "gatewayType": gatewayType,
"gatewaySerialNumber": gatewaySerialNumber, "gatewayEdition": gatewayEdition,
"gatewayPermit": gatewayPermit,"logo":logo}
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
return jsonify(
msg="success",
code=0,
data=jsonParam
)
# 获取系统状态
def systemStateList():
response = getDecodeToken(request.headers.get("Authorization", "")[7:])
token = json.loads(response.data)
if token['code'] == 1:
return jsonify(
msg=token['msg'],
code=1
)
elif token['code'] == 401:
return jsonify(
msg=token['msg'],
code=token['code']
)
try:
url = "http://127.0.0.1:8001/rest/v1/dev/info"
responseInfo = requests.get(url, timeout=5).text
responseInfo = json.loads(responseInfo)
logging.debug("response:%s" % responseInfo)
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
return jsonify(
msg="success",
code=0,
data=responseInfo['data']
)
# 获取主板序列号
def get_main_board_info():
sys_str = platform.system()
serial_number_cmd = ''
cpu_cmd = ''
if sys_str == "Darwin":
# cmd = 'system_profiler SPHardwareDataType | grep Serial | cut -f 2 -d:'
serial_number_cmd = "nsenter -m -n -t 1 system_profiler SPHardwareDataType | grep Serial |awk -F': ' '{print $2}'"
cpu_cmd = "nsenter -m -n -t 1 system_profiler SPHardwareDataType | grep Serial |awk -F': ' '{print $2}'"
elif sys_str == "Windows":
serial_number_cmd = 'nsenter -m -n -t 1 wmic bios get serialnumber'
cpu_cmd = "nsenter -m -n -t 1 wmic cpu get ProcessorId"
else:
# cmd = "dmidecode -t 4 | grep ID |sort -u |awk -F': ' '{print $2}'"
# cmd = "sudo dmidecode -t 1 | grep Serial |awk -F': ' '{print $2}'"
serial_number_cmd = "nsenter -m -n -t 1 cat /sys/devices/virtual/dmi/id/product_uuid"
cpu_cmd = "nsenter -m -n -t 1 dmidecode -t 4 | grep ID |sort -u |awk -F': ' '{print $2}'"
serial_number = os.popen(serial_number_cmd).read()
serial_number = serial_number.replace('SerialNumber', '').strip()
print(serial_number.replace(" ",""))
serial_number = serial_number.replace(" ","")
serial_number = serial_number.replace("-","")
cpu = os.popen(cpu_cmd).read()
cpu = cpu.replace(' ', '').strip()
result = serial_number+cpu
print(result)
return jsonify(
msg="success",
code=0,
data=result
)
# 获取主板序列号
def main_board_info():
sys_str = platform.system()
serial_number_cmd = ''
cpu_cmd = ''
if sys_str == "Darwin":
# cmd = 'system_profiler SPHardwareDataType | grep Serial | cut -f 2 -d:'
serial_number_cmd = "nsenter -m -n -t 1 system_profiler SPHardwareDataType | grep Serial |awk -F': ' '{print $2}'"
cpu_cmd = "nsenter -m -n -t 1 system_profiler SPHardwareDataType | grep Serial |awk -F': ' '{print $2}'"
elif sys_str == "Windows":
serial_number_cmd = 'nsenter -m -n -t 1 wmic bios get serialnumber'
cpu_cmd = "nsenter -m -n -t 1 wmic cpu get ProcessorId"
else:
# cmd = "dmidecode -t 4 | grep ID |sort -u |awk -F': ' '{print $2}'"
# cmd = "sudo dmidecode -t 1 | grep Serial |awk -F': ' '{print $2}'"
serial_number_cmd = "nsenter -m -n -t 1 cat /sys/devices/virtual/dmi/id/product_uuid"
cpu_cmd = "nsenter -m -n -t 1 dmidecode -t 4 | grep ID |sort -u |awk -F': ' '{print $2}'"
serial_number = os.popen(serial_number_cmd).read()
serial_number = serial_number.replace('SerialNumber', '').strip()
print(serial_number.replace(" ",""))
serial_number = serial_number.replace(" ","")
serial_number = serial_number.replace("-","")
cpu = os.popen(cpu_cmd).read()
cpu = cpu.replace(' ', '').strip()
result = serial_number+cpu
print(result)
return result
# 获取配置文件(数据库)
def get_profile():
if os.path.exists("/mnt/work/www/upload/mysql5.7.tar.gz"):
return jsonify(
msg="success",
code=0,
data="/upload/mysql5.7.tar.gz"
)
else:
return jsonify(
code=1,
msg="请先备份再导出!"
)
# 备份配置文件(数据库)
def backup_profile():
cmd = "nsenter -m -n -t 1 sudo -i tar -zcvf /usr/local/mysql5.7.tar.gz /usr/local/mysql5.7/"
result = os.popen(cmd).read()
cmd = "nsenter -m -n -t 1 sudo -i cp /usr/local/mysql5.7.tar.gz /mnt/work/www/upload"
result = os.popen(cmd).read()
return jsonify(
msg="success",
code=0
)
# Invidia信息
def nvidia_info():
# pip install nvidia-ml-py
nvidia_dict = {
"state": True,
"nvidia_version": "",
"nvidia_count": 0,
"gpus": []
}
try:
nvmlInit()
nvidia_dict["nvidia_version"] = nvmlSystemGetDriverVersion()
nvidia_dict["nvidia_count"] = nvmlDeviceGetCount()
for i in range(nvidia_dict["nvidia_count"]):
handle = nvmlDeviceGetHandleByIndex(i)
memory_info = nvmlDeviceGetMemoryInfo(handle)
gpu = {
"gpu_name": nvmlDeviceGetName(handle),
"total": memory_info.total,
"free": memory_info.free,
"used": memory_info.used,
"temperature": f"{nvmlDeviceGetTemperature(handle, 0)}℃",
"powerStatus": nvmlDeviceGetPowerState(handle)
}
nvidia_dict['gpus'].append(gpu)
except NVMLError as _:
nvidia_dict["state"] = False
except Exception as _:
nvidia_dict["state"] = False
finally:
try:
nvmlShutdown()
except:
pass
return nvidia_dict
# 获取gpu使用率
def check_gpu_mem_usedRate():
max_rate = 0.0
info = nvidia_info()
print(info)
if info['nvidia_count'] != 0:
used = info['gpus'][0]['used']
tot = info['gpus'][0]['total']
print(f"GPU0 used: {used}, tot: {tot}, 使用率:{used/tot}")
if used/tot > max_rate:
max_rate = used/tot
print("GPU0 最大使用率:", max_rate)
return jsonify(
msg="success",
code=0,
data=round(max_rate,2)
)
else:
return jsonify(
msg="success",
code=0,
data=0
)
# 写入系统配置
def set_system_config():
try:
jsons = request.json
if jsons.__contains__('gateway_number') and jsons.get("gateway_number") != '':
conf.set("gateway_box",'gateway_number',jsons.get("gateway_number"))
elif jsons.__contains__('gateway_name') and jsons.get("gateway_name") != '':
conf.set("gateway_box",'gateway_name',jsons.get("gateway_name"))
elif jsons.__contains__('gateway_type') and jsons.get("gateway_type") != '':
conf.set("gateway_box",'gateway_type',jsons.get("gateway_type"))
elif jsons.__contains__('gateway_serialnumber') and jsons.get("gateway_serialnumber") != '':
conf.set("gateway_box",'gateway_serialnumber',jsons.get("gateway_serialnumber"))
elif jsons.__contains__('gateway_edition') and jsons.get("gateway_edition") != '':
conf.set("gateway_box",'gateway_edition',jsons.get("gateway_edition"))
with open('setting.ini', 'w') as f:
conf.write(f)
conf.read('setting.ini')
return jsonify(
msg="success",
code=0
)
except Exception as e:
print(e)
return jsonify(
code=1,
msg="Server internal error"
)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。