加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
handlers.py 7.54 KB
一键复制 编辑 原始数据 按行查看 历史
thanatosx 提交于 2015-07-03 18:25 . fixing a bug for deleting user batchly
import functools
import inspect
import logging
import os
from urllib import parse
from aiohttp import web
from aiohttp.web_urldispatcher import StaticRoute
import asyncio
from exception import PrivilegeError
from orm import Model
__author__ = 'thanatos'
def get_required_kw_args(func):
args = []
params = inspect.signature(func).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty:
args.append(name)
return tuple(args)
def get_named_kw_args(func):
args = []
params = inspect.signature(func).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
args.append(name)
return tuple(args)
def get_model_args(func):
args = []
params = inspect.signature(func).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY and type(param.default) == type(Model):
args.append(param)
return tuple(args)
def has_named_kw_args(func):
params = inspect.signature(func).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.KEYWORD_ONLY:
return True
def has_var_kw_arg(func):
params = inspect.signature(func).parameters
for name, param in params.items():
if param.kind == inspect.Parameter.VAR_KEYWORD:
return True
def has_request_arg(func):
sig = inspect.signature(func)
params = sig.parameters
found = False
for name, param in params.items():
if name == 'request':
found = True
continue
if found and (
param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD):
raise ValueError(
'request parameter must be the last named parameter in function: %s%s' % (func.__name__, str(sig)))
return found
def get(path):
"""
get装饰类
get(path)(func)(*args, **kwargs) --> func(*args, **kwargs)
:param path: url路径
:return:
"""
def decorate(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper.__method__ = 'GET'
wrapper.__route__ = path
return wrapper
return decorate
def post(path):
"""
post装饰类
:param path: url路径
:return:
"""
def decorate(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
wrapper.__method__ = 'POST'
wrapper.__route__ = path
return wrapper
return decorate
def permission(name):
"""
为每个route添加权限控制,类似shiro的用法
:param name:
:return:
"""
def decorate(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
request = kwargs.get('request')
if request is None:
raise ValueError()
user = request.__user__
if not (yield from user.access(name)):
raise PrivilegeError()
return (yield from func(*args, **kwargs))
return wrapper
return decorate
class RequestHandler(object):
"""包装Controller,取出request的参数传入Controller
"""
def __init__(self, app, func):
self._app = app
self._func = func
self._has_request_arg = has_request_arg(func)
self._has_var_kw_arg = has_var_kw_arg(func)
self._has_named_kw_args = has_named_kw_args(func)
self._named_kw_args = get_named_kw_args(func)
self._required_kw_args = get_required_kw_args(func)
self._model_args = get_model_args(func)
@asyncio.coroutine
def __call__(self, request):
kw = None
if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args:
# POST
if request.method == 'POST':
if not request.content_type:
return web.HTTPBadRequest(reason='Missing Content-Type.')
ct = request.content_type.lower()
if ct.startswith('application/json'):
params = yield from request.json()
if not isinstance(params, dict):
return web.HTTPBadRequest(reason='JSON body must be object.')
kw = params
elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'):
params = yield from request.post()
kw = dict(**params)
else:
return web.HTTPBadRequest(reason='Unsupported Content-Type: %s' % request.content_type)
# GET
if request.method == 'GET':
qs = request.query_string
if qs:
kw = dict()
for k, v in parse.parse_qs(qs, True).items():
kw[k] = v[0]
if kw is None:
# REST风格的数据解析出来
kw = dict(**request.match_info)
else:
# 参数封装在类内
if self._model_args:
for param in self._model_args:
o = yield from param.default.transfer(kw, False, False)
kw[param.name] = o
# remove all unamed kw:
if not self._has_var_kw_arg and self._named_kw_args:
copy = dict()
for name in self._named_kw_args:
if name in kw:
copy[name] = kw[name]
kw = copy
# check named arg:
for k, v in request.match_info.items():
if k in kw:
logging.warning('Duplicate arg name in named arg and kw args: %s' % k)
kw[k] = v
if self._has_request_arg:
kw['request'] = request
# check required kw:
if self._required_kw_args:
for name in self._required_kw_args:
if name not in kw:
return web.HTTPBadRequest(reason='Missing argument: %s' % name)
return (yield from self._func(**kw))
def add_route(app, func):
"""
将Controller加入路由池
:param app:
:param func: 被包装后的Controller
:return:
"""
method = getattr(func, '__method__', None)
path = getattr(func, '__route__', None)
if path is None or method is None:
raise ValueError('@get or @post not defined in %s.' % str(func))
if not asyncio.iscoroutinefunction(func) and not inspect.isgeneratorfunction(func):
func = asyncio.coroutine(func)
app.router.add_route(method, path, RequestHandler(app, func))
def add_routes(app, module_name):
"""
Controller扫描器
__import__即是会运行一个Module
:param app:
:param module_name:
:return:
"""
n = module_name.rfind('.')
if n == (-1):
mod = __import__(module_name, globals(), locals())
else:
mod = __import__(module_name[:n], globals(), locals(), [module_name[n + 1:]])
for attr in dir(mod):
if attr.startswith('_'):
continue
func = getattr(mod, attr)
if callable(func):
method = getattr(func, '__method__', None)
path = getattr(func, '__route__', None)
if method and path:
add_route(app, func)
def add_static(app):
path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
app.router.add_static('/static/', path)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化