加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
onenet_test.py 3.88 KB
一键复制 编辑 原始数据 按行查看 历史
pi 提交于 2019-01-26 00:10 . only the value chaged,will upload data
#!/usr/bin/python3
# -*- coding:utf-8 -*-
#向平台已经创建的数据流发送数据点
import requests
import json
import time
import datetime
import subprocess
import os
APIKEY = 'oxdf4E0EGHLv=oKAcJY4PzzwKaY=' #改成你的APIKEY
device_ID='509595439'
headers = {'api-key' : APIKEY}
puturl = 'https://api.heclouds.com/devices/' + device_ID+ '/datapoints'
get_mult_url = 'https://api.heclouds.com/devices/' + device_ID + '/datapoints'
def http_post(y,x):
'''
传输数据
'''
CurTime = datetime.datetime.now()
data = {'datastreams':[{'id':y, 'datapoints':[{'value':x}]}]}
jdata = json.dumps(data)
r = requests.post(url=puturl, headers=headers, data=jdata)
print(r.text)
def http_get():
'''
获取数据
'''
r = requests.get(url=get_mult_url, headers=headers)
print(r.text)
def get_cpu_temp():
tmp = open('/sys/class/thermal/thermal_zone0/temp')
cpu = tmp.read()
tmp.close()
return '{:.2f}'.format( float(cpu)/1000 )
def get_gpu_temp():
tmp = subprocess.getoutput('vcgencmd measure_temp|awk -F= \'{print $2}\'').replace('\'C','')
gpu = float(tmp)
return '{:.2f}'.format( gpu )
def get_time_now():
return datetime.now().strftime(' %H:%M:%S\n %Y-%m-%d')
def get_ip_info():
wlan_ip= subprocess.getoutput('ifconfig wlan0|grep -w inet|awk \'{print $2}\'')
net_ip =subprocess.getoutput('ifconfig net0|grep -w inet|awk \'{print $2}\'')
return wlan_ip,net_ip
def get_mem_info():
total= subprocess.getoutput('free -m|grep Mem:|awk \'{print $2}\'')
used = subprocess.getoutput('free -m|grep Mem:|awk \'{print $3}\'')
Usage = float(used)/float(total)*100
return '{:.2f}'.format(Usage)
def get_cpu_info():
#return subprocess.getoutput('top -bn1|grep %Cpu|awk \'{print $8}\'')
#calculate CPU with two short time, time2 - time1
time1 = os.popen('cat /proc/stat').readline().split()[1:5]
time.sleep(0.2)
time2 = os.popen('cat /proc/stat').readline().split()[1:5]
deltaUsed = int(time2[0])-int(time1[0])+int(time2[2])-int(time1[2])
deltaTotal = deltaUsed + int(time2[3])-int(time1[3])
cpuUsage = float(deltaUsed)/float(deltaTotal)*100
return '{:.2f}'.format(cpuUsage)
def getDisk():
#get Disk information,DISK[8],[9],[10],[11]:Size, Used. free, Used %
DISK = os.popen('df /').read().split()[7:12]
Usage = int(DISK[1])/int(DISK[0])*100
return '{:.2f}'.format(Usage)
#print ('Disk total space is %s ' %DISK[0])
#print ('Disk Used space is %s ' %DISK[1] +'and is %s' %DISK[3])
#print ('Disk Free space is %s ' %DISK[2])
def main():
wip_pre=''
nip_pre=''
cpu_temp_pre=''
gpu_temp_pre=''
mem_usage_pre=''
cpu_usage_pre=''
disk_usage_pre=''
while(True):
cpu_temp_cur=get_cpu_temp()
if cpu_temp_cur!=cpu_temp_pre:
cpu_temp_pre=cpu_temp_cur
http_post('CPU_temp',cpu_temp_cur)
gpu_temp_cur=get_gpu_temp()
if gpu_temp_cur!=gpu_temp_pre:
gpu_temp_pre=gpu_temp_cur
http_post('GPU_temp',gpu_temp_cur)
wip_cur,nip_cur=get_ip_info()
if wip_cur!=wip_pre:
wip_pre=wip_cur
http_post( 'wlan_ip' ,wip_cur)
if nip_cur!=nip_pre:
nip_pre=nip_cur
http_post( 'net_ip' ,nip_cur)
mem_usage_cur=get_mem_info()
if mem_usage_cur!=mem_usage_pre:
mem_usage_pre=mem_usage_cur
http_post('mem_usage',mem_usage_cur)
cpu_usage_cur=get_cpu_info()
if cpu_usage_cur!=cpu_usage_pre:
cpu_usage_pre=cpu_usage_cur
http_post( 'cpu_usage' ,cpu_usage_cur)
disk_usage_cur=getDisk()
if disk_usage_cur!=disk_usage_pre:
disk_usage_pre=disk_usage_cur
http_post( 'disk_usage' ,disk_usage_cur)
time.sleep(5)
pass
if __name__ == '__main__':
main()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化