加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
moduleAthena.py 3.48 KB
一键复制 编辑 原始数据 按行查看 历史
1181411728@qq.com 提交于 2021-08-02 13:35 . Initial commit
import boto3
import time
import datetime
import io
import pandas as pd
from botocore.exceptions import ClientError
region = "cn-northwest-1"
access_key = "AKIA2V3QVTGADHOQR7DK"
secret_key = "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"
bucket_output = 'athenaout'
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region)
s3_file = boto3.resource('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region)
# 检查s3文件是否存在
def s3_file_check(bucket, key):
try:
s3_file.Object(bucket, key).load()
except ClientError as e:
return int(e.response['Error']['Code']) != 404
return True
# 查询Athena数据
def get_athena_data_all_db(db,sql):
response = do_athena_sql_all_db(db,sql)
file_name = response['QueryExecutionId'] + '.csv'
key = "res/" + file_name
# print('key :\n', key)
obj = None
try:
for i in range(180):
time.sleep(0.1)
if s3_file_check(bucket_output, key):
# print('find res file!!!')
time.sleep(0.1)
break
obj = s3.get_object(Bucket=bucket_output, Key=key)['Body'].read()
# print('结果字符串:\n',io.BytesIO(obj))
df = pd.read_csv(io.BytesIO(obj))
# print('Athena return:\n', df)
except Exception as e:
print(e)
return pd.DataFrame
return df
#执行Athena sql 操作
def do_athena_sql_all_db(db,in_sql):
# 连接 athena
athena = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region)
# 通过athena进行数据统计
# athena统计的结果只能放到S3中,所以需要提供临时存放文件的bucket和其中的output作为存放路径
# athena计算完后,程序从S3中加载结果文件读取内容,然后放入到数据库中
s3_output = "s3://athenaout/res/"
try:
# 从athena提取数据sql
response = athena.start_query_execution(
QueryString=in_sql,
QueryExecutionContext={
'Database': db
},
ResultConfiguration={
'OutputLocation': s3_output
}
)
except Exception as e:
print('athena have error')
print(e)
print('-----------------------------------------')
# print('athena response:\n', response)
return response
# 根据起始时间,构造分区查询条件
def get_partition_condition(start,end):
s=datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
e=datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")
condition=' '
if s.year==e.year:
condition=" and partition_0='%s'"%(str(s.year).zfill(4))
if s.month==e.month:
condition=condition+" and partition_1='%s'"%(str(s.month).zfill(2))
if s.day == e.day:
condition = condition + " and partition_2='%s'" % (str(s.day).zfill(2))
if s.hour == e.hour:
condition = condition + " and partition_3='%s'" % (str(s.hour).zfill(2))
else:
m=''
for i in range(s.month,e.month+1):
m=m+"'%s',"%(str(i).zfill(2))
condition = condition + " and partition_1 in (%s)" % (m[:-1])
else:
y = ' '
for i in range(s.year, e.year + 1):
y = y + "'%s'," % (str(i).zfill(4))
condition = condition + " and partition_0 in (%s)" % (y[:-1])
return condition
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化