加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
refresh_query.py 7.87 KB
一键复制 编辑 原始数据 按行查看 历史
袁长刚 提交于 2021-12-22 15:07 . v1.0.0
# coding=utf-8
###########################
#本脚本用于redash刷新带参数的查询
#Config
###########################
#需要刷新的redash环境基础域名
from time import sleep
base_url = "http://localhost:5000"
#需要刷新的查询,将查询id加到这个list中
queries = [215]
#管理员的apikey,通过设置界面查看
api_key = "ubtFoc1HR7eUP3ZAn7v3ck6ku2AxFyLlYN4U7Hu2"
###########################
#Script
###########################
def get_time_parse(value_type,value):
import datetime
from datetime import timedelta
now= datetime.datetime.now()
#今天
today= now
#昨天
yesterday= now- timedelta(days=1)
#本周第一天和最后一天
this_week_start= now- timedelta(days=now.weekday())
this_week_end= now+ timedelta(days=6-now.weekday())
#上周第一天和最后一天
last_week_start= now- timedelta(days=now.weekday()+7)
last_week_end= now- timedelta(days=now.weekday()+1)
#本月第一天和最后一天
this_month_start= datetime.datetime(now.year, now.month,1)
this_month_end= datetime.datetime(now.year, now.month+ 1,1)- timedelta(days=1)
#上月第一天和最后一天
last_month_end= this_month_start- timedelta(days=1)
last_month_start= datetime.datetime(last_month_end.year, last_month_end.month,1)
#本季第一天和最后一天
month= (now.month- 1)- (now.month- 1)% 3 + 1
this_quarter_start= datetime.datetime(now.year, month,1)
this_quarter_end= datetime.datetime(now.year, month+ 3,1)- timedelta(days=1)
#上季第一天和最后一天
last_quarter_end= this_quarter_start- timedelta(days=1)
last_quarter_start= datetime.datetime(last_quarter_end.year, last_quarter_end.month- 2,1)
#本年第一天和最后一天
this_year_start= datetime.datetime(now.year,1,1)
this_year_end= datetime.datetime(now.year+ 1,1,1)- timedelta(days=1)
#去年第一天和最后一天
last_year_end= this_year_start- timedelta(days=1)
last_year_start= datetime.datetime(last_year_end.year,1,1)
#过去7天
last_7_days_start=now -timedelta(days=7)
#过去14天
last_14_days_start=now -timedelta(days=14)
#过去30天
last_30_days_start=now -timedelta(days=30)
#过去90天
last_90_days_start=now -timedelta(days=90)
#过去12个月
last_12_months_start=now -timedelta(days=365)
all_parse_time_dict={
"d_today": today,
"d_yesterday": yesterday,
"d_this_day":{"start":today,"end":today},
"d_last_day":{"start":yesterday,"end":yesterday},
"d_this_week":{"start":this_week_start,"end":this_week_end},
"d_this_month":{"start":this_month_start,"end":this_month_end},
"d_this_quarter":{"start":this_quarter_start,"end":this_quarter_end},
"d_this_year":{"start":this_year_start,"end":this_year_end},
"d_last_week":{"start":last_week_start,"end":last_week_end},
"d_last_month":{"start":last_month_start,"end":last_month_end},
"d_last_year":{"start":last_year_start,"end":last_year_end},
"d_last_7_days":{"start":last_7_days_start,"end":now},
"d_last_14_days":{"start":last_14_days_start,"end":now},
"d_last_30_days":{"start":last_30_days_start,"end":now},
"d_last_90_days":{"start":last_90_days_start,"end":now},
"d_last_12_months":{"start":last_12_months_start,"end":now},
}
datetime_type=[
"date",
"datetime-local",
"datetime-with-seconds"
]
datetime_range_type=[
"date-range",
"datetime-range",
"datetime-range-with-seconds"
]
if value_type in datetime_type:
if value_type == "date":
parse_time = all_parse_time_dict[value].strftime("%Y-%m-%d")
elif value_type == "datetime-local":
parse_time = all_parse_time_dict[value].strftime("%Y-%m-%d %H:%M")
else:
parse_time = all_parse_time_dict[value].strftime("%Y-%m-%d %H:%M:%S")
return parse_time
else:
if value_type == "date-range":
parse_time = {
"start":all_parse_time_dict[value]["start"].strftime("%Y-%m-%d"),
"end":all_parse_time_dict[value]["end"].strftime("%Y-%m-%d")
}
elif value_type == "datetime-range":
parse_time = {
"start":all_parse_time_dict[value]["start"].strftime("%Y-%m-%d 00:00"),
"end":all_parse_time_dict[value]["end"].strftime("%Y-%m-%d 23:59")
}
else:
parse_time = {
"start":all_parse_time_dict[value]["start"].strftime("%Y-%m-%d 00:00:00"),
"end":all_parse_time_dict[value]["end"].strftime("%Y-%m-%d 23:59:59")
}
return parse_time
def get_query_parameters_list(query_id,api_key,base_url):
import requests,json
uri = "/api/queries/%d" % query_id
api_key = "Key "+ api_key
headers = {
"Authorization":api_key,
"Content-Type":"application/json"
}
# print(uri)
try:
res = requests.get(base_url+uri,headers=headers)
except:
return print("network error")
if res.status_code != 200:
return print("query does not exist!!")
else:
query_parameters_list = json.loads(res.content)["options"]["parameters"]
return query_parameters_list
def refresh_query_result(query_id,query_parameters_list,api_key,base_url):
import requests,json
uri = "/api/queries/%d/results" %query_id
api_key = "Key " + api_key
headers = {
"Authorization":api_key,
"Content-Type":"application/json"
}
datetime_type=[
"date",
"datetime-local",
"datetime-with-seconds"
]
datetime_value=[
"d_today",
"d_yesterday"
]
datetime_range_type=[
"date-range",
"datetime-range",
"datetime-range-with-seconds"
]
datetime_range_value = [
"d_this_day",
"d_last_day",
"d_this_week",
"d_this_month",
"d_this_quarter",
"d_this_year",
"d_last_week",
"d_last_month",
"d_last_year",
"d_last_7_days",
"d_last_14_days",
"d_last_30_days",
"d_last_90_days",
"d_last_12_months",
]
query_parameters={}
for parameter in query_parameters_list:
if parameter["type"] in datetime_range_type:
if parameter["value"] in datetime_range_value:
time_value=get_time_parse(parameter["type"],parameter["value"])
else:
time_value=parameter["value"]
query_parameters[parameter["name"]]=time_value
elif parameter["type"] in datetime_type:
if parameter["value"] in datetime_value:
time_value=get_time_parse(parameter["type"],parameter["value"])
else:
time_value=parameter["value"]
query_parameters[parameter["name"]]=time_value
else:
query_parameters[parameter["name"]]=parameter["value"]
query_parameters_dict={
"id":query_id,
"max_age":0,
"parameters":query_parameters
}
data = json.dumps(query_parameters_dict)
# print(data)
try:
res = requests.post(base_url+uri,headers=headers,data=data)
except:
return print("network error")
if res.status_code != 200:
# print(res.status_code)
# return print("query does not exist!!")
return res.content
else:
refresh_res_dict = json.loads(res.content)
if refresh_res_dict["job"]["error"] != "":
return print("error"+refresh_res_dict["job"]["error"])
else:
return res.content
querys = [121,180,177,225,225,160]
for query in range(1,500):
try:
print("参数:",query,api_key,base_url)
query_parameter = get_query_parameters_list(query,api_key,base_url)
set_query_refresh = refresh_query_result(query,query_parameter,api_key,base_url)
print(query_parameter,set_query_refresh)
except Exception as ex:
print(ex)
sleep(600)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化