代码拉取完成,页面将自动刷新
# -*- coding: UTF-8 -*-
# pip install requests -i https://pypi.tuna.tsinghua.edu.cn/simple
import requests
from requests.exceptions import Timeout
import random
import string
# pip install pycryptodome -i https://pypi.tuna.tsinghua.edu.cn/simple
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Hash import SHA256, MD5
from Crypto.Random import get_random_bytes
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Cipher import PKCS1_v1_5 as Cipher_PKCS1_v1_5
import hashlib
import base64
import json
import logging
import argparse
import time
logging.basicConfig(format="%(asctime)s %(name)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
class Platform:
"""
邵武智慧社区服务平台
"""
def __init__(self, url: str) -> None:
self._url = url
self.headers = {
"Content-Length": "212",
"Content-Type": "text/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 \
Safari/537.36 Edg/116.0.1938.54"
}
self._session = requests.session()
def get_param(self, t, e, a):
o = hashlib.sha256((t + str(e)).encode()).digest()
o = base64.b64encode(o).decode()
a_md5 = MD5.new(a.encode()).hexdigest().upper()
s = SHA256.new(a_md5.encode()).digest()
s = base64.b64encode(s).decode()
iv = get_random_bytes(16)
cipher = AES.new("123".encode(), AES.MODE_CBC, iv)
s = s.encode()
ciphertext = cipher.encrypt(s)
ciphertext_base64 = base64.b64encode(ciphertext).decode()
key_base64 = base64.b64encode(cipher.key).decode()
iv_base64 = base64.b64encode(iv).decode()
return [ciphertext_base64, key_base64, iv_base64]
def login(self):
# 构造登录需要的负载参数
current_time = time.time()
milliseconds = int(round(current_time * 1000))
username = "test3"
password = "yf@2020"
p, k, i = self.get_param(username, milliseconds, password)
data = {
"i": i,
"k": k,
"n": username,
"p": p,
"t": int(round(time.time() * 1000))
}
print("负载:{0}".format(data))
response = self._session.post(self._url, headers=self.headers, json=data)
print("状态码:{0}".format(response.status_code))
print("响应数据:{0}".format(response.content.decode("utf-8")))
class Community:
"""
每个小区的智慧社区服务平台
"""
def __init__(self, community_url) -> None:
self._community_url = community_url
self._token = ""
self._encrypted_password = ""
self._face_devices = []
self._session = requests.session()
def encypt(self, password: str):
public_key_pem = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAKoR8mX0rGKLqzcWmOzbfj64K8ZIgOdH\nnzkXSOVOZbFu/TJhZ7rFAN+eaGkl3C4buccQd/EjEsj9ir7ijT7h96MCAwEAAQ=="
public_key_der = base64.b64decode(public_key_pem)
public_key = RSA.import_key(public_key_der)
cipher = PKCS1_OAEP.new(public_key)
encrypted_data = cipher.encrypt(password.encode())
return base64.b64encode(encrypted_data).decode("utf-8")
def decrypt(self, password: str):
pass
def login(self):
url = "http://{0}/prod-api/login".format(self._community_url)
self._encrypted_password = self.encypt(password="yf@2020")
headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
"Content-Type": "application/json;charset=UTF-8",
"Cookie": "username=fuzhou; rememberMe=True; password={0}".format(self._encrypted_password),
"isToken": "false",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
}
data = {
"password": "yf@2020",
"username": "test3"
}
response = self._session.post(url=url, headers=headers, json=data)
logging.info("/prod-api/login 状态码:{0}, 响应内容:省略".format(response.status_code))
if response.status_code == 200:
result = response.content.decode("utf-8")
result = json.loads(result)
logging.info("code:{0}".format(result["code"]))
logging.info("msg:{0}".format(result["msg"]))
logging.info("token:{0}".format(result["token"]))
self._token = result["token"]
print(type(self._token))
print(type(self._encrypted_password))
cookie_dict = {
"rememberMe": "true",
"username": "test3",
"password": self._encrypted_password,
"Admin-Token": self._token
}
self._session.cookies.update(cookie_dict)
else:
logging.info("/prod-api/login failed {0}一体机平台登录失败".format(self._community_url))
def get_face_device_info(self, communityName: str):
url = "http://{0}/prod-api/community/device/list?pageNum=1&pageSize=8&deviceType=1".format(self._community_url)
headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Authorization": "Bearer {0}".format(self._token),
"Connection": "keep-alive",
"Content-Type": "application/json;charset=UTF-8",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
}
response = self._session.get(url=url, headers=headers)
logging.info("/prod-api/community/device/list 状态码:{0}, 响应内容:省略".format(response.status_code))
if response.status_code == 200:
result = response.content.decode("utf-8")
result = json.loads(result)
logging.info(result["msg"])
logging.info("一共{0}台门禁设备".format(result["total"]))
if len(communityName) > 0:
url = "http://{0}/prod-api/community/device/list?communityName={1}&pageNum=1&pageSize={2}&deviceType=1".format(self._community_url,
communityName,
result["total"])
response = self._session.get(url=url, headers=headers)
if response.status_code == 200:
result = response.content.decode("utf-8")
result = json.loads(result)
logging.info("{0} 小区一共{1}台门禁设备".format(communityName, result["total"]))
rows = result["rows"]
for row in rows:
logging.info("SN:{0}, IP:{1}, 位置:{2}, 在线状态:{3}".format(row["deviceSn"],
row["deviceIp"],
row["devicePosition"],
"在线" if row["onlineStatus"] == "0" else "离线"))
self._face_devices.append({"SN": row["deviceSn"], "IP": row["deviceIp"], "位置": row["devicePosition"],
"在线状态": "在线" if row["onlineStatus"] == "0" else "离线"})
return self._face_devices
else:
logging.error("/prod-api/community/device/list failed {0}".format(self._community_url))
else:
logging.error("/prod-api/community/device/list failed {0}".format(self._community_url))
# 人脸设备控制
class FaceDevice:
def __init__(self, args) -> None:
self._args = args
self._randomdata = None
self._password = "456258"
self._authorization = None
self._userid = None
self._url = ""
self._sn = ""
self._version = ""
self._global_config = None
self._session = requests.session()
self._E03C = True
self._headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
"Content-Type": "application/json;charset=UTF-8",
"Cookie": "lang=zh_CN",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
}
def parse_args(self) -> None:
logging.info(self._args)
set_config = False
if self._args.url is None:
logging.error("请输入门禁设备IP")
return
self._url = self._args.url
self.login()
self.get_device_info()
self.get_global_config()
if self._args.showIP is not None:
self.show_ip(self._args.showIP)
set_config = True
if self._args.showDeviceKey is not None:
self.show_devicekey(self._args.showDeviceKey)
set_config = True
if self._args.showPeopleNum is not None:
self.show_peoplenum(self._args.showPeopleNum)
set_config = True
if self._args.identityScore is not None:
self.set_identify_scores(self._args.identityScore)
set_config = True
if self._args.displayContent is not None:
self.set_custom_displayMode(self._args.displayContent)
set_config = True
if set_config:
self.set_global_config()
self.logout()
def get_random(self):
url = "http://{0}:8090/device/GetRandom".format(self._url)
data = {
"lang": "zh_CN",
"randomlen": 8
}
try:
response = self._session.post(url=url, headers=self._headers, json=data)
logging.info("/GetRandom 状态码:{0}, 响应内容:{1}".format(response.status_code, response.content.decode("utf-8")))
if response.status_code == 200:
result = response.content.decode("utf-8")
result = json.loads(result)
self._randomdata = result["data"]["randomdata"]
else:
logging.error("/GetRandom failed")
except Timeout:
logging.error("{0} 访问超时".format(url))
def random_string(self, length=8):
characters = string.ascii_letters + string.digits
result = ''.join(random.choice(characters) for _ in range(length))
return result
def C(self, e):
"""
encryptPassword
"""
a = 24
o = len(e)
# 将 e 的长度截断为 24 个字符
e = e[:24]
# 如果 e 的长度小于 24,用字符 "U" 进行填充
while a > o:
e += "U"
o += 1
# 输出填充后的密码字符串
return e
def y(self, e):
return ''.join(list(str(self.C(e)))[:16][::-1])
def Utf8_parse(self, s):
return bytes(s.encode('utf-8'))
def encrypt_data(self, data, key, iv):
cipher = AES.new(key, AES.MODE_CBC, iv)
padded_data = pad(data, AES.block_size)
encrypted_data = cipher.encrypt(padded_data)
return encrypted_data
def gen_checkcode(self):
if self._randomdata is None:
self.get_random()
e = str(self._randomdata) + self.random_string() + self._password
o = self.Utf8_parse(self.C(self._password))
i = self.Utf8_parse(self.y(self._password))
n = self.Utf8_parse(e)
data = self.encrypt_data(n, o, i)
return base64.b64encode(data).decode("utf-8")
def login(self) -> None:
url = "http://{0}:8090/device/Login".format(self._url)
checkcode = self.gen_checkcode()
data = {
"checkcode": checkcode,
"lang": "zh_CN",
"username": "admin"
}
try:
response = self._session.post(url=url, headers=self._headers, json=data)
logging.info("/device/Login 状态码:{0}, 响应内容:省略".format(response.status_code))
if response.status_code == 200:
result = response.content.decode("utf-8")
result = json.loads(result)
self._authorization = result["data"]["authorization"]
self._userid = result["data"]["userid"]
logging.info("门禁设备{0}登录".format(self._url))
else:
logging.error("/device/Login failed")
except Timeout:
logging.error("{0} 访问超时".format(url))
def logout(self):
url = "http://{0}:8090/device/Logout".format(self._url)
data = {
"authorization": self._authorization,
"lang": "zh_CN",
"userid": self._userid
}
try:
response = self._session.post(url=url, headers=self._headers, json=data)
logging.info("/device/Logout 状态码:{0}, 响应内容:省略".format(response.status_code))
if response.status_code == 200:
result = response.content.decode("utf-8")
result = json.loads(result)
logging.info("门禁设备{0}退出:{1}".format(self._url, result))
else:
logging.error("/device/Logout failed")
except Timeout:
logging.error("{0} 访问超时".format(url))
def get_global_config(self) -> None:
url = "http://{0}:8090/device/getGlobalConfig".format(self._url)
data = {
"authorization": self._authorization,
"lang": "zh_CN",
"userid": self._userid
}
try:
response = self._session.post(url=url, headers=self._headers, json=data)
logging.info("/device/getGlobalConfig 状态码:{0}, 响应内容:省略".format(response.status_code))
if response.status_code == 200:
result = response.content.decode("utf-8")
result = json.loads(result)
self._global_config = result["data"]
logging.info("设备sn:{0}".format(self._global_config["ph_device_no"]))
if self._global_config["ph_device_no"].startswith("E03C"):
self._E03C = True
else:
self._E03C = False
logging.warn("不支持修改语音识别信息,人脸识别阈值等功能")
else:
logging.error("/device/getGlobalConfig failed")
except Timeout:
logging.error("{0} 访问超时".format(url))
def set_custom_displayMode(self, message: str) -> None:
if not self._E03C:
logging.warn("不支持修改语音识别信息,人脸识别阈值等功能")
return
if self._global_config is not None:
logging.info("修改 {0} displayModType:{1} -> 100".format(self._url, self._global_config["displayModType"]))
logging.info("修改 {0} 识别显示文字:{1} -> {2}".format(self._url, self._global_config["displayModContent"], message))
logging.info("修改 {0} ttsModType:{1}-> 100".format(self._url, self._global_config["ttsModType"]))
logging.info("修改 {0} 语音播报内容:{1} -> {2}".format(self._url, self._global_config["ttsModContent"], message))
self._global_config["displayModType"] = 100
self._global_config["displayModContent"] = message
self._global_config["ttsModType"] = 100
self._global_config["ttsModContent"] = message
self._global_config["authorization"] = self._authorization
self._global_config["userid"] = self._userid
def set_identify_scores(self, score: int) -> None:
"""
人脸识别阈值score: 0-100
"""
if not self._E03C:
logging.warn("不支持修改语音识别信息,人脸识别阈值等功能")
return
if self._global_config is not None:
logging.info("修改 {0} 人脸识别阈值:{1} -> {2}".format(self._url, self._global_config["identifyScores"], score))
self._global_config["identifyScores"] = score
self._global_config["authorization"] = self._authorization
self._global_config["userid"] = self._userid
def show_ip(self, type: int) -> None:
"""
type: 1---不显示,2---显示
"""
if not self._E03C:
logging.warn("不支持修改语音识别信息,人脸识别阈值等功能")
return
if type != 1 and type != 2:
logging.info("type:{0} 值不合法,1---不显示,2---显示".format(type))
return
if self._global_config is not None:
logging.info("修改 {0} 是否显示IP:{1} -> {2}".format(self._url, "显示" if self._global_config["showIp"] == 2 else "不显示",
"显示" if type == 2 else "不显示"))
self._global_config["showIp"] = type
self._global_config["authorization"] = self._authorization
self._global_config["userid"] = self._userid
def show_devicekey(self, type: int) -> None:
"""
type: 1---不显示,2---显示
"""
if not self._E03C:
logging.warn("不支持修改语音识别信息,人脸识别阈值等功能")
return
if type != 1 and type != 2:
logging.info("type:{0} 值不合法,1---不显示,2---显示".format(type))
return
if self._global_config is not None:
logging.info("修改 {0} 是否显示devicekey:{1} -> {2}".format(self._url, "显示" if self._global_config["showDeviceKey"] == 2 else "不显示",
"显示" if type == 2 else "不显示"))
self._global_config["showDeviceKey"] = type
self._global_config["authorization"] = self._authorization
self._global_config["userid"] = self._userid
def show_peoplenum(self, type: int) -> None:
"""
type: 1---不显示,2---显示
"""
if not self._E03C:
logging.warn("不支持修改语音识别信息,人脸识别阈值等功能")
return
if type != 1 and type != 2:
logging.info("type:{0} 值不合法,1---不显示,2---显示".format(type))
return
if self._global_config is not None:
logging.info("修改 {0} 是否显示peopleNum:{1} -> {2}".format(self._url, "显示" if self._global_config["showPeopleNum"] == 2 else "不显示",
"显示" if type == 2 else "不显示"))
self._global_config["showPeopleNum"] = type
self._global_config["authorization"] = self._authorization
self._global_config["userid"] = self._userid
def set_identify_distance(self, distance: int) -> None:
"""
TODO
"""
pass
def set_global_config(self) -> None:
if not self._E03C:
logging.warn("不支持修改语音识别信息,人脸识别阈值等功能")
return
url = "http://{0}:8090/device/setGlobalConfig".format(self._url)
headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
"Content-Length": "4529",
"Content-Type": "application/json;charset=UTF-8",
"Cookie": "lang=zh_CN",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
}
try:
response = requests.post(url=url, headers=headers, json=self._global_config)
logging.info("/device/setGlobalConfig 状态码:{0}, 响应内容:{1}".format(response.status_code, response.content.decode("utf-8")))
if response.status_code == 200:
result = response.content.decode("utf-8")
result = json.loads(result)
code = result["code"]
if code == "WEB_EXP-1011":
logging.warn(result["msg"] + ",请等待门禁设备升级完成.")
elif code == "WEB_EXP-1008":
logging.error(result["msg"] + ",认证失败.")
elif code == "WEB_SUS-0":
success = result["success"]
if success:
logging.info("配置修改成功")
else:
logging.error("/device/setGlobalConfig failed")
except Timeout:
logging.error("{0} 访问超时".format(url))
def get_device_info(self) -> None:
url = "http://{0}:8090/device/GetDevInfo".format(self._url)
headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"Connection": "keep-alive",
"Content-Length": "35",
"Content-Type": "application/json;charset=UTF-8",
"Cookie": "lang=zh_CN",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
}
data = {
"authorization": self._authorization,
"lang": "zh_CN",
"userid": self._userid
}
try:
response = self._session.post(url=url, headers=headers, json=data)
logging.info("/device/GetDevInfo 状态码:{0}, 响应内容:{1}".format(response.status_code, response.content.decode("utf-8")))
if response.status_code == 200:
result = response.content.decode("utf-8")
result = json.loads(result)
code = result["code"]
if code == "WEB_EXP-1011":
logging.warn(result["msg"] + ",请等待门禁设备升级完成.")
elif code == "WEB_EXP-1008":
logging.error(result["msg"] + ",认证失败.")
elif code == "WEB_SUS-0":
logging.info("当前门禁设备IP:{0}".format(result["data"]["ip"]))
logging.info("当前门禁设备SN:{0}".format(result["data"]["SN"]))
logging.info("当前门禁设备类型:{0}".format(result["data"]["devtype"]))
logging.info("当前门禁设备版本:{0}".format(result["data"]["hardware"]))
self._sn = result["data"]["SN"]
self._version = result["data"]["hardware"]
if self._sn.startswith("E03C"):
self._E03C = True
logging.info("支持修改语音识别信息,人脸识别阈值等功能")
else:
self._E03C = False
logging.warn("不支持修改语音识别信息,人脸识别阈值等功能")
else:
logging.error("/device/GetDevInfo failed")
except Timeout:
logging.error("{0} 访问超时".format(url))
if __name__ == "__main__":
# platform = Platform(url="http://183.253.179.151:8888/api/client/login")
# platform.login()
community = Community(community_url="183.253.179.151")
community.encypt(password="yf@2020")
community.login()
face_devices = community.get_face_device_info(communityName="花香小区")
# 目前一共有327台门禁设备,83个小区247台门禁设备SN以E03开头,27个小区80台门禁设备SN以84开头
parser = argparse.ArgumentParser(description="""
人脸设备配置修改
可以登录政务外网的2个平台网站,各个小区的智慧社区平台以及门禁设备的网页端
确保一体机可以连通,并且从一体机可以ping通门禁设备就可以访问到门禁设备
对于SN为E03开头的门禁设备,可以配置识别成功信息,人脸阈值,识别距离,是否显示IP等信息
对于SN为84开头的门禁设备,门禁设备暂时没有找到对应接口来修改配置
对于SN为E03和84开头的门禁设备,都可以进行远程升级,远程升级功能还未实现......""")
parser.add_argument("--url", type=str, action="store", help="人脸设备的IP")
parser.add_argument("--showIP", type=int, action="store", help="是否显示人脸设备的IP(1---不显示,2---显示)")
parser.add_argument("--showDeviceKey", type=int, action="store", help="是否显示人脸设备的SN(1---不显示,2---显示)")
parser.add_argument("--showPeopleNum", type=int, action="store", help="是否显示人脸设备的存储人数(1---不显示,2---显示)")
parser.add_argument("--displayContent", type=str, action="store", help="人脸设备识别成功展示的信息")
parser.add_argument("--identityScore", type=int, action="store", help="人脸设备的识别阈值(0-100)")
args = parser.parse_args()
device = FaceDevice(args=args)
device.parse_args()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。