Create your Gitee Account
Explore and code with more than 12 million developers,Free private repositories !:)
Sign up
文件
Clone or Download
test.py 7.21 KB
Copy Edit Raw Blame History
# import websocket
# from websocket import create_connection
# import urllib
# import json
# #from http import cookiejar
# import http.cookiejar
#
# try:
# import thread
# except ImportError:
# import _thread as thread
# import threading
# import time
#
# def on_message(ws, message):
# if message.count("@#$") > 4: # 获取报警窗口(实时报警信息)
# try:
# alert_time, alert_point, alert_name, alert_script, \
# alert_action, alert_voice, alert_type, alert_value = message.split("@#$")
# except:
# return
# """判断报警点及报警类型,并执行相关操作"""
# pass
# else: # 获取报警持续状态信息
# alert_info = (message.split("[")[1]).split("]")[0]
# pass
# print(message)
#
#
# def on_error(ws, error):
# print(error)
#
# def on_close(ws):
# print("### closed ###")
#
# def on_open(ws):
# def run(*args):
# ws.send("201500110000")
# heartbeat = threading.Timer(10, run)
# heartbeat.start()
#
# heartbeat = threading.Timer(10, run)
# heartbeat.start()
#
# def get_data():
# data_url = "http://192.168.1.10/bin/getDataBaseInfo.wk"
# headers = {'Content-Type': 'application/json'} # 设置请求头 告诉服务器请求携带的是json格式的数据
#
# cookie_filename = 'cookie.txt'
# cookie_aff = http.cookiejar.MozillaCookieJar(cookie_filename)
# cookie_aff.load(cookie_filename, ignore_discard=True, ignore_expires=True)
#
# handler = urllib.request.HTTPCookieProcessor(cookie_aff)
# opener = urllib.request.build_opener(handler)
# # 使用cookie登陆get_url
# get_data = {
# "link": "Serial1.battery@电网总功率",
# "startTime": "2018-12-17 15:00:00",
# "endTime": "2018-12-17 15:00:14"
# }
# get_request = urllib.request.Request(url=data_url, data=json.dumps(get_data).encode(encoding='UTF8'),
# headers=headers)
# get_response = opener.open(get_request)
# datainfo = json.loads(get_response.read().decode())
#
# return datainfo["data"]
#
#
# # if __name__ == "__main__":
# # websocket.enableTrace(True)
# # ws = websocket.WebSocketApp("ws://192.168.1.10:9998/",
# # on_message=on_message,
# # on_error=on_error,
# # on_close=on_close)
# # ws.on_open = on_open
# # ws.run_forever()
# if __name__ == '__main__':
# url = 'http://192.168.1.10/bin/login.wk'
# data_url = "http://192.168.1.10/bin/getDataBaseInfo.wk"
# alert_url = "http://192.168.1.10/bin/getAlertInfo.wk"
# login = {
# "name": "admin",
# "password": "e10adc3949ba59abbe56e057f20f883e"
# }
#
# headers = {'Content-Type': 'application/json'} # 设置请求头 告诉服务器请求携带的是json格式的数据
# # request = urllib.request.Request(url=url, headers=headers,
# # data=json.dumps(values).encode(encoding='UTF8')) # 需要通过encode设置编码 要不会报错
# #
# # response = urllib.request.urlopen(request) # 发送请求
# #
# #
# # logInfo = response.read().decode() # 读取对象 将返回的二进制数据转成string类型
# # print(logInfo)
#
# # headers = {'User-Agent': r'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_3) AppleWebKit/537.36' \
# # r' (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36', 'Connection': 'keep-alive'}
# cookie_filename = 'cookie.txt'
# cookie_aff = http.cookiejar.MozillaCookieJar(cookie_filename)
# handler = urllib.request.HTTPCookieProcessor(cookie_aff)
# opener = urllib.request.build_opener(handler)
#
# request = urllib.request.Request(url=url, data=json.dumps(login).encode(encoding='UTF8'), headers=headers)
# try:
# response = opener.open(request)
# except urllib.error.URLError as e:
# print(e.reason)
# cookie_aff.save(ignore_discard=True, ignore_expires=True)
#
# logInfo = json.loads(response.read().decode()) # 读取对象 将返回的二进制数据转成string类型
# print(logInfo)
# if logInfo["result"] == True:
# pass
# else:
# pass
# # for item in cookie_aff:
# # print('Name =' + item.name)
# # print('Value =' + item.value)
# # # 使用cookie登陆get_url
# # get_data = {
# # "link": "Serial1.battery@电网总功率",
# # "startTime": "2018-12-17 15:00:00",
# # "endTime": "2018-12-17 15:00:30"
# # }
# # get_request = urllib.request.Request(url=data_url, data=json.dumps(get_data).encode(encoding='UTF8'), headers=headers)
# # get_response = opener.open(get_request)
# # print(get_response.read().decode())
# time.sleep(2)
# get_data()
#
#
#
#import websocket
from threading import Thread
import time
import sys
import math
import time
import datetime
import json
import csv
#import xlrd,xlwt
# from ScadaService.dbhelper import DBHelper
# from ScadaService.models import Device_info, Electric_price,Load_data
from Common.dbhelper import DBHelper
from Common.models import PvMeta, Device_info, Electric_price, Load_data, PV_forecast
from Common.settings import *
try:
import xml.etree.cElementTree as ET
except:
import xml.etree.ElementTree as ET
if __name__ == "__main__":
# fieldnames = ['Value', 'Time']
# with open('D:\微网项目\电暖气数据\试验样机6号(b8_27_eb_7f_4a_03)\\en.csv', 'r') as r:
# f_read = csv.reader(r)
# with open('D:\微网项目\电暖气数据\试验样机6号(b8_27_eb_7f_4a_03)\\en1.csv', 'a', newline='') as w:
# # 设定写入模式
# csv_write = csv.writer(w, dialect='excel')
# # 写入具体内容
# for data in f_read:
# if len(data):
# if data[0] != "Value":
# timestamp = int(float(data[1])/1000)
# #timenow = int(time.time())
# t = time.localtime(timestamp)
# data[1] = time.strftime('%Y-%m-%d %H:%M:%S', t)
# data[0] = float(data[0])
# csv_write.writerow(data)
# else:
# csv_write.writerow(data)
# pass
now = time.time()
dbhelper = DBHelper(conf=None)
session = dbhelper.Session()
device_info = session.query(PV_forecast.time_stamp).all()
device_name = []
for device in device_info:
device_name.append(device.device_name)
xml_path = "/root/Web3.0/project/Net1_.xml" # ""C:\\win-shared\\微控网关\\project\\Net1_.xml"
tree = ET.parse(xml_path)
root = tree.getroot()
points = dict()
for Link in root.iter("Link"):
if Link.attrib["Name"] == "modbus_转发":
for Device in Link.iter("Device"):
if Device.attrib["Name"] == "pc上位机":
for Type in Device.iter("Type"):
pointlist = list()
for Point in Type.iter("Point"):
pointlist.append(Point.attrib)
points[Type.attrib["Value"]] = pointlist
print(str(points))
pass
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化