代码拉取完成,页面将自动刷新
同步操作将从 Gitee 极速下载/FileCodeBox 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
from typing import Union
from datetime import datetime, timedelta
from fastapi import Header, HTTPException, Request
import settings
async def admin_required(pwd: Union[str, None] = Header(default=None), request: Request = None):
if 'share' in request.url.path:
if pwd != settings.ADMIN_PASSWORD and not settings.ENABLE_UPLOAD:
raise HTTPException(status_code=403, detail='本站上传功能已关闭,仅管理员可用')
else:
if not pwd or pwd != settings.ADMIN_PASSWORD:
raise HTTPException(status_code=401, detail="密码错误,请重新登录")
class IPRateLimit:
def __init__(self, count, minutes):
self.ips = {}
self.count = count
self.minutes = minutes
def check_ip(self, ip):
# 检查ip是否被禁止
if ip in self.ips:
if self.ips[ip]['count'] >= self.count:
if self.ips[ip]['time'] + timedelta(minutes=self.minutes) > datetime.now():
return False
else:
self.ips.pop(ip)
return True
def add_ip(self, ip):
ip_info = self.ips.get(ip, {'count': 0, 'time': datetime.now()})
ip_info['count'] += 1
self.ips[ip] = ip_info
return ip_info['count']
async def remove_expired_ip(self):
for ip in list(self.ips.keys()):
if self.ips[ip]['time'] + timedelta(minutes=self.minutes) < datetime.now():
self.ips.pop(ip)
def __call__(self, request: Request):
ip = request.headers.get('X-Real-IP', request.headers.get('X-Forwarded-For', request.client.host))
if not self.check_ip(ip):
raise HTTPException(status_code=400, detail=f"请求次数过多,请稍后再试")
return ip
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。