代码拉取完成,页面将自动刷新
同步操作将从 Daz/auto-Api 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
# sys.path.append())
from common.documents import APIModel, Ask, RawValue, Question
import json
from common.constant import RangeType, BaseTypeList
from common.summarize_func import s_Enum, s_Range
from common.constant import QuestionType
from solvers import success_c
def build_args(data):
"""
:param data: 是参数信息
:return: 返回一个RawValue
"""
res = RawValue()
if isinstance(data, dict):
# 字典类型
res.val_type = 'dict'
for key in data:
res.dict_value[key] = build_args(data[key]).uid
elif isinstance(data, list):
# 列表类型
res.val_type = 'list'
for item in data:
res.list_value.append(build_args(item).uid)
res.value_range_type = RangeType.SP_LEN
else: # 基本类型
for tp in BaseTypeList:
if tp.check(data):
res.val_type = str(tp())
break
res.value_range_type = RangeType.HISTORY
res.value_range = json.dumps([data], ensure_ascii=False)
res.uid = RawValue.get_last_uid()
res.required = True
res.available = False
res.save()
return res
def summarize(model, new_one):
"""
通过以往的知识,进行总结
:param model: 过去的知识
:param new_one: 新的示例
:return: None
"""
if model.val_type != new_one.val_type:
model.value_range_type = RangeType.CONFLICT # 类型不一致,学到冲突范围,这是API不规范导致的
if model.value_range_type in [RangeType.FLOW, RangeType.UNBOUND, RangeType, RangeType.CONFLICT]:
# 这些参数不适合再做归纳
return
if model.val_type not in ['list', 'dict']:
# 基本类型归纳
if model.value_range_type in [RangeType.ENUM, RangeType.HISTORY]:
model.value_range = s_Enum(model.value_range, new_one.value_range)
elif model.value_range_type == RangeType.RANGE:
model.value_range = s_Range(model.value_range, new_one.value_range, model.val_type)
elif model.val_type == 'list':
if len(model.list_value) != len(new_one.list_value):
model.val_type = RangeType.VAR_LEN
# 对下面的条目进行归纳
if model.val_type == RangeType.VAR_LEN:
if model.list_value and new_one.list_value:
summarize(RawValue.get_by_uid(model.list_value[0]),
RawValue.get_by_uid(new_one.list_value[0]))
# 仅归纳第一个
elif new_one.list_value:
# 新增一个
model.list_value.append(new_one.list_value[0])
else: # 数量固定
for s, n in zip(map(RawValue.get_by_uid, model.list_value), map(RawValue.get_by_uid, new_one.list_value)):
summarize(s, n)
else: # 对字典类型进行归纳
for key in model.dict_value:
item = RawValue.get_by_uid(model.dict_value[key])
if key not in new_one.dict_value:
item.required = False
else:
summarize(item, RawValue.get_by_uid(new_one.dict_value[key]))
item.save()
for key in new_one.dict_value:
if key not in model.dict_value:
item = RawValue.get_by_uid(new_one.dict_value[key])
model.dict_value[key] = item.uid
item.required = False
item.save()
model.save()
def mark_arg(arg):
# 对所有的子类和本体打上有效标记
arg.available = True
arg.save()
for item in arg.list_value:
mark_arg(RawValue.get_by_uid(uid=item))
for k in arg.dict_value:
mark_arg(RawValue.get_by_uid(uid=arg.dict_value[k]))
def make_questions():
# 创建状态
for model in APIModel.objects():
# 为每个模型构建问题
for q_type in filter(lambda x: "__" not in x, QuestionType.__dict__):
question = Question()
question.que_type = QuestionType.__dict__[q_type]
question.about = model.url
question.solved = False
key = "[{}]-{}".format(question.que_type, question.about)
if not Question.objects(key=key).first():
question.key = key
question.save() # 只有没有这个问题的时候才保存
def clean_args(md):
mark_arg(md.args)
mark_arg(md.result)
for arg in RawValue.objects(available=False):
RawValue.delete(arg)
def build():
checked = set()
for ask in Ask.objects():
if ask.url in checked:
continue
checked.add(ask.url)
print(ask.url)
model = APIModel.objects(url=ask.url).first()
if not model:
model = APIModel()
model.url = ask.url
# 第一步,通过元信息,筛选成功的API
check_list = []
model.has_file = False
for ca in Ask.objects(url=ask.url):
if success_c(ca):
if ca.has_file:
model.has_file = True
check_list.append(ca)
if model.has_file: # 不为带文件的请求构建model
continue
if not check_list:
continue # 没有合格的请求,无视这个url
# 根据已有的信息,聚合信息生成最严格的model
for ca in check_list[-10:]:
if model.args is None:
model.args = build_args(ca.data)
else: # 归纳
summarize(model.args, build_args(ca.data))
result = build_args(json.loads(ca.resp_content.decode()))
if model.result is None:
model.result = result
else:
summarize(model.result, result)
clean_args(model)
model.save()
make_questions()
if __name__ == '__main__':
build()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。