加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
ClientDemo_online.py 3.38 KB
一键复制 编辑 原始数据 按行查看 历史
茗趣yhj 提交于 2021-08-25 16:39 . 添加注释
# -*- coding: utf-8 -*-
import os
import sys
import json
import yaml
from com.aliyun.api.gateway.sdk import client
from com.aliyun.api.gateway.sdk.http import request
from com.aliyun.api.gateway.sdk.common import constant
reload(sys)
sys.setdefaultencoding('utf8')
# 同级目录下 config.yaml
config_file = open(os.path.dirname(os.path.abspath(__file__)) + '/config.yaml')
config = yaml.safe_load(config_file)
config_file.close()
APP_KEY = config['APP_KEY']
APP_SECRET = config['APP_SECRET']
CLI = client.DefaultClient(app_key=APP_KEY, app_secret=APP_SECRET)
# get请求
def request_get(host, url, params):
keys = params.keys()
if params:
uri_lst = []
for i in range(len(keys)):
uri_lst.append('%s=%s' % (keys[i], params[keys[i]]))
uri_str = '&'.join(uri_lst)
url += '?' + uri_str
req = request.Request(host=host, protocol=constant.HTTP, url=url, method="GET", time_out=30000)
req.set_query_str(bytearray(source=json.dumps(params), encoding='utf-8'))
res = CLI.execute(req)
if res[0] != 200:
result = {
'status': res[0],
'msg': res[1],
'result': {}
}
else:
result = json.loads(res[-1])
return result
# post form
def request_post_form(host, url, params):
req_post = request.Request(host=host, protocol=constant.HTTP, url=url, method="POST", time_out=30000)
bodyMap = {}
bodyMap["bodyForm1"] = "fwefwef"
bodyMap["bodyForm2"] = "ffwefwef"
req_post.set_body(bodyMap)
req_post.set_content_type(constant.CONTENT_TYPE_FORM)
res = CLI.execute(req_post)
if res[0] != 200:
result = {
'status': res[0],
'msg': res[1],
'result': {}
}
else:
result = json.loads(res[-1])
return result
# post body stream
def request_post_stream(host, url, params):
req_post = request.Request(host=host, protocol=constant.HTTP, url=url, method="POST", time_out=30000)
body = {}
body["name"] = "testName1111111"
body["address"] = "testAddress"
body["email"] = "testemail@123.com"
req_post.set_body(bytearray(source=json.dumps(body), encoding="utf8"))
req_post.set_content_type(constant.CONTENT_TYPE_STREAM)
res = CLI.execute(req_post)
if res[0] != 200:
result = {
'status': res[0],
'msg': res[1],
'result': {}
}
else:
result = json.loads(res[-1])
return result
if __name__ == '__main__':
# api 网关申请:
# https://apigateway.console.aliyun.com/?spm=5176.12818093.ProductAndService--ali--widget-home-product-recent.dre0.67d216d0e6veOq#/cn-hangzhou/overView/view
# 此处接口 host 属于一致
host = 'http://wuliu.market.alicloudapi.com'
# 全国快递物流查询
print '-----------全国快递物流查询------------'
url = "/kdi"
uri_dict = {'no': 773115067077322, 'type': 'STO'}
res = request_get(host, url, uri_dict)
print res
# 获取快递公司名称
print '-----------获取快递公司名称------------'
uri_dict = {'type': '中通快递'}
uri_dict = {'type': 'ALL'} # STO
url = '/getExpressList'
res = request_get(host, url, uri_dict)
print res
# 单号识别快递公司
print '-----------单号识别快递公司------------'
url = "/exCompany"
uri_dict = {'no': 773115067077322}
res = request_get(host, url, uri_dict)
print res
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化