加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main.py 11.06 KB
一键复制 编辑 原始数据 按行查看 历史
hljlxp 提交于 2018-09-28 15:48 . add pause between reading cycle
#!/usr/bin/env python
# -*- coding: utf_8 -
from ws4py.client.threadedclient import WebSocketClient
import OpenOPC
import time
import json
from cassandra.cluster import Cluster
tmp_log =""
def error_log(txt):
import os
import time
global tmp_log
name ="log/"
name += time.strftime('%Y-%m-%d',time.localtime(time.time()))
if tmp_log == txt:
return
else:
tmp_log = txt
stmp = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
name+='.txt'
file=open(name,"a")
file.write(txt)
file.write("\t\t")
file.write(stmp)
file.write("\n")
file.close()
pass
class SQLClass:
def __init__(self,config=None):
url=config['cassandra_url']
print url
try:
cluster = Cluster([url])
session = cluster.connect('ss')
self.session = session
pass
except Exception, e:
print "database collection falield..."
txt = str(url)+"数据库连接失败。。。,NO003"+str(e)
error_log(txt)
self.url = url
self.config = config
self.collector = None
self.cache = {}
self.data_limit_map=None
pass
# def getConfig(self):
# if self.config :
# return self.config
# data = self.session.execute('select name,value from config')
# config = {}
# for (name,value) in data:
# config[name]=value
# self.config = config
# return config
def getDataLimits(self):
if self.data_limit_map:
return self.data_limit_map
try:
data = self.config['item_limits']
pass
except Exception, e:
error_log("数据库连接失败。。。")
data_limit_map = {}
for (id,data_limit,data_type,standard_data) in data:
data_limit_map[id]=data_limit
data_limit_map[id+"data_type"] = data_type
data_limit_map[id+"standard_data"] = standard_data
self.data_limit_map = data_limit_map
return data_limit_map
# def getCollectItems(self):
# if self.collector:
# return self.collector
# try:
# data = self.session.execute("SELECT * from collector")
# self.collector = data
# except Exception, e:
# error_log("数据库连接失败。。。")
# self.reconnect()
# return data
def reconnect(self):
try:
cluster = Cluster([self.url])
session = cluster.connect('ss')
self.session = session
pass
except Exception, e:
error_log("数据库重连失败。。。")
return False
# raise e
pass
def insertData(self,item_id=None,value=None,timestamp=None,project_id=None,save_time=None):
import uuid
count = 0;
last_value = "0"
# item_id = str(item_id)
if self.cache.has_key(item_id+'_last'):
#如果缓存中存在、取缓存中的数据
count = self.cache[item_id+'_last']['write_count']
last_value = self.cache[item_id+"_last"]["value"]
else:
#不存在取最后一次数据
try:
# cluster = Cluster(["192.168.2.146"])
# session = cluster.connect('ss')
# stat = session.prepare("select * from data where item_id = ? limit 1")
# last_data = session.execute(stat, ['Random.Int2'])
# last_value = last_data.value
# stat = self.session.prepare("select * from data where item_id = ? limit 1")
last_data = self.session.execute("select * from data where item_id =%s limit 1", [item_id])
for item in last_data:
last_value = item.value
# last_value = last_data.value
pass
except Exception, e:
print str(e)
error_log("数据库连接失败。。。NO006")
self.reconnect()
return False
# print list(last_data)
for item in last_data:
count = item.write_count
# 插入规则
dataLimits = self.getDataLimits()
# 默认没有限制
limit_boolean = True
standard_data = last_value
tmp=value
insert_boolean = True
if dataLimits.has_key(item_id):
limit_value = dataLimits[item_id]
data_type = dataLimits[item_id+'data_type']
# standard_data = dataLimits[item_id+'standard_data']
try:
if data_type == "I":
tmp = int(tmp)
standard_data = int(standard_data)
if data_type == "F":
tmp = float(tmp)
standard_data = float(standard_data)
if data_type == "I" or data_type =="F":
max_value = standard_data+(standard_data*limit_value)/1000
min_value = standard_data-(standard_data*limit_value)/1000
# 如果在范围内 数据限制了
if (tmp<=max_value and tmp>=min_value):
limit_boolean = False
except Exception, e:
error_log(str(e)+"数据转换异常,NO_005")
self.reconnect()
return False
pass
else:
tmp=str(tmp)
standard_data = str(standard_data)
if tmp == standard_data:
limit_boolean =False
# 连续几秒没有相同的数据
if self.cache.has_key(item_id+"_last"):
long_time = self.cache[item_id+"_last"]['long_time']
if time.time()-long_time<=save_time:
insert_boolean = False
# 范围之内,看时间
if not (limit_boolean or insert_boolean):
return None
count+=1
str_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
obj = {'item_id':item_id,
'write_count':count,
'project_id':project_id,
'create_time':str_time,
'timestamp':timestamp,
'value':value,
'long_time':int(time.time())}
try:
self.session.execute("""insert into data (item_id,write_count,project_id,create_time,timestamp,value,long_time) values(%(item_id)s,%(write_count)s,%(project_id)s,%(create_time)s,%(timestamp)s,%(value)s,%(long_time)s)""",obj)
except Exception, e:
error_log(str(e)+"数据库连接失败。。。")
self.reconnect()
return None
self.cache[item_id+'_last'] = obj
json_str_tmp = json.dumps(obj)
return obj
def close(self):
pass
# websocket 连接
class DummyClient(WebSocketClient):
def opened(self):
print "open"
# self.rd.run(self.send)
def __init__(self,url):
super(DummyClient,self).__init__(url)
print '__init__'
# def closed(self, code, reason=None):
# print "Closed down", code, reason
def received_message(self, m):
print m
# 2次发送数据,支持重连
def send(self,playload):
try:
super(DummyClient,self).send(playload)
except Exception, e:
try:
super(DummyClient,self).__init__(self.url)
super(DummyClient,self).connect()
super(DummyClient,self).send(playload)
# super(DummyClient,self).run_forever()
except Exception, e:
print 'websocket connection loss...'
error_log(str(e)+"websocket 连接失败")
raise e
count = 1
# item => 数据源,ws =>websocket,opc =>opc ,dv => 本次读数是否发送给大屏,ar=>本次是否发送给archibus
def read(config,ws=None,opc=None,insert=None,speak=None,project_id=None,insert_boolean=None,save_time=None):
datas ={}
# 处理连续保存
has_inserted=False
arr = []
try:
client = opc.open_client(config['collector']['opc_server_ip'])
client.connect(config['collector']['opc_name'])
opc_items = config['collector']['opc_items']
# print config['config'][""], opc_items
start = 0
split=500
if config["config"].has_key("split"):
split = config["config"]["split"]
end = split
while True:
if start >=len(opc_items):
break
items=opc_items[start:end]
for name, value, quality,timestamp in client.read(items):
# print quality
if quality == "Error":
error_log(name+" "+quality)
else:
has_inserted=insert(name, str(value),timestamp,project_id,save_time)
if has_inserted :
arr.append(has_inserted)
time.sleep(1)
print "[",start,":",end,"] reading...pause 1 seconds !!"
start +=split
end+=split
client.close()
except Exception, e:
print "OPC collection failed",e
error_log(str(e)+"OPC connect failed")
print 'read',arr
# print arr
# arr=['test']
if ws :
for item in arr:
json_str_tmp = json.dumps(item)
ws.send(json_str_tmp)
def strJSONWS(Action=None,Data=None,Message=None,Level=None,Time=None,attribute=None,datavalue=None):
import json
dataSend = {}
if Action != None:
global count
dataSend['Sn']=count
count = count+1
dataSend['Action']=Action
if Data != None:
dataSend['Data']=Data
if Level != None:
dataSend['Level'] = Level
if Message != None:
dataSend['Message'] = Message
if Time != None:
dataSend['Time'] = Time
if attribute != None:
dataSend['attribute'] = attribute
if datavalue != None:
dataSend['datavalue'] = datavalue
str_json=json.dumps(dataSend)
return str_json
def readJSON():
import json
# file=open("make_data.json")
# content=file.read()
# print content
# data=json.loads(content)
# return data
file = "make_data.json"
try:
t = open(file).read()
data=json.loads(t)
except Exception, e:
with open(file) as flow:
data=flow.read()
data2 = data.replace('\n', '')
data3 = data2.replace(' ', '')
a = json.loads(data3)
with open(file, "w") as workflow:
writeData = json.dumps(a, sort_keys=True, indent=3)
workflow.write(writeData)
t = open(file).read()
data = json.loads(t)
return data
class ReadData():
def __init__(self):
self.status = False
# 从数据库中获取
data=readJSON()
# url = self.getCassandraURLSQLite()
self.sqlObject = SQLClass(config=data)
self.ws = None
self.opc = OpenOPC
self.status= False
self.config = data
def setStatus(self,status):
self.status =status
def close(self,speak=None):
print 'close...'
if(self.ws != None):
self.ws.close()
# speak 图形界面显示的文字
def run(self):
time_count = 0
insert_boolean = False
config =self.config
try:
if not (config['config']['web_socket_url'] is None or config['config']['web_socket_url'] == ""):
def test():
try:
ws = DummyClient(config['config']['web_socket_url'])
ws.connect()
print "websocket connect success"
except Exception, e:
print "webSocket connect failed,NO_006"
error_log("webSocket 连接失败")
return ws
if self.ws:
ws = ws
else:
self.ws = test()
ws = self.ws
else:
ws =None
# ws = DummyClient('ws://59.110.16.96:8000/v1/nr8l3plk34xlff320k1ykoty9uf20ihi/collect/dms/1503752076')
while self.status:
try:
st_time = int(self.config['config']['RATE_OF_DV'])
sv_time = int(self.config['config']['SAVE_TIME_RATE'])
if time_count%sv_time == 0:
insert_boolean = True
else:
insert_boolean = False
time_count+= st_time
time.sleep(st_time)
project_id = self.config['config']['project_id']
read(config=self.config,
ws=ws,
opc=self.opc,
insert=self.sqlObject.insertData,
project_id=project_id,
insert_boolean=insert_boolean,
save_time=sv_time)
except Exception, e:
print "program exit NO_001"+str(e)
time.sleep(5)
# # error_log("数据有误,closed...")
# raise e
except Exception, e:
# speak("数据有误,NO_002")
error_log("data error NO_002")
if __name__ == '__main__':
rd = ReadData()
rd.setStatus(True)
rd.run()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化