代码拉取完成,页面将自动刷新
#!/usr/bin/python
# coding:utf-8
import time
from flask import Flask, render_template, request
import requests, socket,pexpect,ping3
import pymysql,re,subprocess
from requests.exceptions import Timeout
from datetime import datetime
from io import StringIO
from ping3 import ping
app = Flask(__name__)
conn = pymysql.connect(
host='127.0.0.1',
user='urlops',
password='urlops',
db='urlops',
charset='utf8',
port=3306,
autocommit=True
)
# 使用cursor()方法获取操作游标
cursor = conn.cursor()
# sql = "SELECT u.address from urlops as u WHERE u.address=u.address"
sql1 = "SELECT * FROM urlops;"
cursor.execute(sql1)
res = cursor.fetchall()
# print("数据",res)
#首页
@app.route('/')
def demo():
resslist = get_ssl()
print('结果是:',resslist)
return render_template('index.html',resslist=resslist)
#证书刷新
@app.route('/sslkey')
def ssl():
ssl="SELECT s.domain FROM sslkey as s ;"
conn.ping(reconnect=True)
cursor.execute(ssl)
domains = cursor.fetchall()
for domain in domains:
#查询ssl的sql后数据后面有个逗号,所以使用domain[0]取第一个。
print("域名",domain[0])
sslkey(str(domain[0]))
resslist = get_ssl()
return render_template('index.html',resslist=resslist)
#证书新增add
@app.route('/ssladd',methods=['POST'])
def get_ssladd():
domain = request.form.get("domain")
ssladd(str(domain))
print('新增一条域名')
ssl()
resslist = get_ssl()
return render_template('index.html',resslist=resslist)
#站点监控查询路由
@app.route('/systeminfo')
def get_url():
urlslist = get_select()
return render_template('systeminfo.html', urlslist=urlslist)
#站点监控新增和修改
@app.route('/add',methods=['POST'])
def get_add():
id = request.form.get("id")
mission_name = request.form.get("mission_name")
address = request.form.get("address")
state = request.form.get("state")
if id:
updateurl(id,mission_name,address,state)
print("更新成功")
else:
addurl(mission_name,address,state)
print("新增成功")
urlslist = get_select()
return render_template('systeminfo.html', urlslist=urlslist)
#站点监控删除路由
@app.route('/dalete/<int:id>', methods=['GET'])
def get_delete(id):
print("传进来id:", id)
print('id', id)
sql = "DELETE FROM urlops WHERE id=%s;"
cursor.execute(sql, id)
conn.commit()
print('删除成功!')
urlslist = get_select()
return render_template('systeminfo.html', urlslist=urlslist)
#站点监控更新路由
@app.route('/update/<int:id>',methods=['GET'])
def get_update(id):
print("修改的id",id)
urlslist = get_select()
updateedit = editupdate(id)
print("获取查询id",updateedit)
id = updateedit[0]
mission_name = updateedit[1]
address = updateedit[2]
state = updateedit[3]
print("结果是:",id,mission_name,address,state)
return render_template('systeminfo.html', urlslist=urlslist,id=id,mission_name=mission_name,address=address,state=state)
#TCP监控
@app.route('/tcpinfo')
def get_tcp():
tcplist = get_tcp_select()
return render_template('tcpinfo.html',tcplist=tcplist)
#PING探测
@app.route('/pinginfo')
def get_ping():
pinglist = get_ping_select()
return render_template('pinginfo.html',pinglist=pinglist)
#证书查询方法
def get_ssl():
s = "select * from sslkey ;"
conn.ping(reconnect=True)
cursor.execute(s)
re = cursor.fetchall()
resslist = []
for resss in re:
res_dict = {}
res_dict['id'] = resss[0]
res_dict['domain'] = resss[1]
res_dict['common_name'] = resss[2]
res_dict['start_date_st'] = resss[3]
res_dict['expire_date_st'] = resss[4]
res_dict['remaining'] = resss[5]
resslist.append(res_dict)
return resslist
#证书新增方法
def ssladd(domain):
ssladd1 = "INSERT INTO sslkey(domain) VALUES('"+domain+"')"
cursor.execute(ssladd1)
conn.commit()
conn.close()
#站点监控查询方法
def get_select():
sql1 = 'SELECT * from urlops WHERE state="http";'
conn.ping(reconnect=True)
cursor.execute(sql1)
res = cursor.fetchall()
urlslist = []
for url in res:
result_dict = {}
result_dict['id'] = url[0]
result_dict['name'] = url[1]
result_dict['url'] = url[2]
result_dict['state'] = url[4]
# 请求状态
try:
req = requests.get(url[2], timeout=5 )
code = req.status_code
if code ==200:
code = "正常"
code_fin = code
except Exception as e:
print(e)
code_fin = '探测失败'
result_dict['code'] = code_fin
urlslist.append(result_dict)
print("结果", urlslist)
return urlslist
#tcp监控查询方法
def get_tcp_select():
sql2 = 'SELECT * from urlops WHERE state="tcp";'
conn.ping(reconnect=True)
cursor.execute(sql2)
res = cursor.fetchall()
tcplist = []
for url in res:
result_dict = {}
result_dict['id'] = url[0]
result_dict['name'] = url[1]
result_dict['url'] = url[2]
result_dict['port'] = url[3]
result_dict['state'] = url[4]
# 请求状态
try:
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
a = s.connect((url[2],int(url[3])))
print('打印结果',a)
code = "正常"
code_fin1 = code
except OSError as os:
print ('探测失败,确认是否有防火墙拦截探测')
code_fin1 = '探测失败'
result_dict['code'] = code_fin1
tcplist.append(result_dict)
print("结果", tcplist)
return tcplist
#ping主机存活检查
def get_ping_select():
sql3 = 'SELECT * from urlops WHERE state="ping";'
conn.ping(reconnect=True)
cursor.execute(sql3)
res = cursor.fetchall()
pinglist=[]
for url in res:
result_dict = {}
result_dict['id'] = url[0]
result_dict['name'] = url[1]
result_dict['url'] = url[2]
result_dict['port'] = url[3]
result_dict['state'] = url[4]
# #异常检查
# try:
# ping = pexpect.spawn('ping -c 1 %s' % url[2])
# check = ping.expect([pexpect.TIMEOUT, "1 packets transmitted, 1 received, 0% packet loss"], 2)
# if check == 1:
# code = "正常"
# code_fin1 = code
# except OSError as os:
# code_fin1 = '探测失败'
#ping3模块测试
ping = ping3.ping(url[2],timeout=1,unit='ms')
if ping is None:
print('探测失败')
code_fin1 = '探测失败'
else:
code = "正常"
code_fin1 = code
result_dict['code'] = code_fin1
pinglist.append(result_dict)
return pinglist
#新增方法
def addurl(mission_name,address,state):
add = "INSERT INTO urlops(mission_name,address,state) VALUES('"+mission_name+"','"+address+"','"+state+"')"
cursor.execute(add)
conn.commit()
conn.close()
#更新方法
def updateurl(id,mission_name,address,state):
conn.ping(reconnect=True)
cursor.execute("UPDATE urlops SET mission_name='"+ mission_name +"',address='"+ address +"',state='"+ state +"' WHERE id=" + str(id))
conn.commit()
conn.close()
#更新前先根据id查询数据并提供前台显示
def editupdate(id):
conn.ping(reconnect=True)
cursor.execute("SELECT * from urlops WHERE id=" +str(id))
row = cursor.fetchone()
conn.close()
return row
#ssl证书监控
def sslkey(domain):
f = StringIO()
comm = f"curl -Ivs https://{domain} --connect-timeout 10"
result = subprocess.getstatusoutput(comm)
f.write(result[1])
m = re.search('start date: (.*?)\n.*?expire date: (.*?)\n.*?common name: (.*?)\n.*?issuer: CN=(.*?)\n', f.getvalue(), re.S)
start_date = m.group(1)
expire_date = m.group(2)
common_name = m.group(3)
issuer = m.group(4)
# time 字符串转时间数组
start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
# datetime 字符串转时间数组
expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")
# 剩余天数
remaining = (expire_date-datetime.now()).days
print ('域名:', domain)
print ('通用名:', common_name)
print ('开始时间:', start_date_st)
print ('到期时间:', expire_date_st)
print (f'剩余时间: {remaining}天')
print ('颁发机构:', issuer)
print ('*'*30)
time.sleep(0.10)
str_sql="UPDATE sslkey SET common_name='"+ common_name +"',start_date_st='"+ start_date_st +"',expire_date_st='"+ expire_date_st +"',remaining='"+ str(remaining) +"' WHERE domain=" + "'"+domain+"'"
print(str_sql)
cursor.execute(str_sql)
conn.commit()
#conn.close()
# add = "INSERT INTO sslkey(domain,common_name,start_date_st,expire_date_st,remaining) VALUES(%s,%s,%s,%s,%s)"
# cursor.execute(add,(domain,common_name,start_date_st,expire_date_st,remaining))
# conn.commit()
# conn.close()
if __name__ == '__main__':
app.run(host='0.0.0.0' ,port=5000,debug=True)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。