加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
visual_chart_VAT300.py 5.13 KB
一键复制 编辑 原始数据 按行查看 历史
陈敏华 提交于 2021-04-16 14:02 . 1.增加codeisland的校筛数据导入
#! /usr/bin/env python3
# -*- coding: UTF-8 -*-
# Author: Bary Chen
import socket
import threading
import time
import httplib2
from tools import qt, network
from tools.env_paras import EXCEPTED_IP
# def get_available_ip():
# address = socket.gethostbyname_ex(socket.gethostname())[-1]
# if isinstance(address, list):
# for ip in EXCEPTED_IP:
# if ip in address:
# address.remove(ip)
# return address
# return []
def identify_vat300(server_port: int):
address = network.get_available_ip()
previous_ip_seg = []
detected_ip = [None, None]
thread_pool = []
lock = threading.Lock()
headers = {'content-type': 'application/json'}
for local_ip in address:
if qt.isIP(local_ip):
ip_seg = local_ip.split('.')[:3]
if ip_seg != previous_ip_seg:
previous_ip_seg = ip_seg
for i in range(1, 256):
device_ip = '.'.join(ip_seg) + '.' + str(i)
if device_ip == local_ip or device_ip in EXCEPTED_IP:
continue
url = 'http://' + device_ip + ':' + str(server_port)
thread_pool.append(threading.Thread(target=network.get_valid_ip,
args=(device_ip, local_ip, lock, detected_ip, url, headers)))
for thread in thread_pool:
thread.start()
time.sleep(0.00001)
for thread in thread_pool:
thread.join()
return detected_ip
# def get_valid_ip(device_ip: str, server_port: int, local_ip: str, lock: threading.Lock, detected_ip):
# if check_vat300_connection(device_ip, server_port):
# lock.acquire()
# detected_ip[0] = device_ip
# detected_ip[1] = local_ip
# lock.release()
#
#
# def check_vat300_connection(device_ip: str, server_port: int):
# if qt.isIP(device_ip):
# h = httplib2.Http(timeout=1)
# url = 'http://' + device_ip + ':' + str(server_port)
# headers = {'content-type': 'application/json'}
# try:
# resp, content = h.request(url, 'GET', headers=headers)
# h.close()
# if resp.status == 200:
# return True
# except (httplib2.HttpLib2Error, socket.error):
# return False
#
# # 以下代码仅查询端口,不做服务器内容确认
# # s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# # s.settimeout(1)
# # ret = s.connect_ex((device_ip, server_port))
# # s.close()
# # if ret == 0:
# # return True
# return False
def _get_vision(item: str, left='%28', right='%29'):
if left in item and item.endswith(right):
info = item.split(left)
if len(info) == 2:
v = info[1].replace(right, '')
if qt.isNumber(v):
return float(v)
return None
def parse_vat300_data_from_path(data: str):
# data 字符串:
# /?visionType=E&eyes=&right=%3C0.1%284.0%29&left=%3C0.1%284.0%29&\
# resultTime=2021-02-09+13%3A13%3A16&userName=&userId=110101199001076583&deviceNumber=1
# 此函数仅用于解析VAT300发送回的path信息
msg = data.lower()
vision = {}
if 'right' in msg and 'left' in msg and 'userid' in msg:
for item in msg.split('&'):
if item.startswith('right'):
vision['OD'] = _get_vision(item)
elif item.startswith('left'):
vision['OS'] = _get_vision(item)
elif item.startswith('userid'):
vision['ID'] = item.lstrip('userid=')
return vision
def parse_vat300_data_from_latest_queried(data: dict):
vision = {}
if isinstance(data, dict):
data_list = data.get('resultList', [])
vision['ID'] = data.get('userId', '')
for eye_data in data_list:
value = eye_data.get('vision', '')
if eye_data.get('model', None) == '右眼模式':
if 'OD' not in vision:
vision['OD'] = {}
vision['OD']['Uncorrected'] = _get_vision(value, left='(', right=')')
elif eye_data.get('model', None) == '左眼模式':
if 'OS' not in vision:
vision['OS'] = {}
vision['OS']['Uncorrected'] = _get_vision(value, left='(', right=')')
return vision
if __name__ == '__main__':
# import httplib2, time, json
#
# start = time.time()
# uri = 'http://127.0.0.1:8810'
#
# headers = {'content-type': 'application/json', 'connection':'close'}
# h = httplib2.Http(timeout=1)
# for i in range(10):
# try:
# resp, content = h.request(uri, 'POST', #headers=headers,
# body=json.dumps({'www': 'MM', 'mml': 'lkfdas', 'kd': 'nn'}))
# print(resp.status, content, 111)
# h.close()
#
# # print(content)
# except (httplib2.HttpLib2Error, socket.error) as e:
# print(e)
# finally:
# h.close()
#
# time.sleep(0.001)
#
# print(time.time() - start)
# print(parse_vat300_data_from_latest_queried(dd))
pass
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化