代码拉取完成,页面将自动刷新
同步操作将从 JOHNSON/multithreaded-socket 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
from buff import Buff
from urllib.parse import quote, unquote
from typing import AnyStr
JSON_MESSAGE = "JSON"
TEXT_MESSAGE = "TEXT"
FILE_MESSAGE = "FILE"
class HeaderList(dict):
@property
def str(self):
d: str = ""
for i in self.items():
d += "{KEY}: {VALUE}\r\n".format(KEY=i[0], VALUE=i[1])
return d
class Protocol(object):
"""
协议结构如下, 参考 HTTP 协议包结构
type version
key: value
data
协议内容如下, 参考 HTTP 协议包结构
FILE V1.0
Content-Length: 5
12345
头部信息
Content-Length // 内容长度
"""
VERSION: str = "1.0"
TYPE: str
HEADER: HeaderList = HeaderList()
BODY: str
def __init__(self, ty=None, data=None):
if data:
self.set_content(data)
if ty:
self.set_type(ty)
def set_type(self, ty: str):
self.TYPE = ty
def set_header(self, key, value):
self.HEADER[key] = value
def set_content(self, data: AnyStr):
if isinstance(data, str):
data = data.encode()
self.set_header("Content-Length", len(data))
self.BODY: bytes = data
@property
def header(self):
return self.HEADER
@property
def body(self):
return self.BODY
@property
def type(self):
return self.TYPE
@property
def version(self):
return self.VERSION
@staticmethod
def handler(buff: Buff):
l: str = buff.readLine(deocde=True)
c = l.split(" ", 1)
if len(c) < 2:
return None
p = Protocol()
p.TYPE = c[0].strip()
p.VERSION = c[1].strip()
while 1:
h: str = buff.readLine(deocde=True)
e = h.split(": ")
if len(e) != 2:
break
p.HEADER[e[0].strip()] = e[1].strip()
if p.HEADER.get('Content-Length'):
content_length = int(p.HEADER['Content-Length'])
p.BODY = buff.read(content_length)
return p
return None
@property
def raw(self):
# Bytes 数据包结构
return self.TYPE.encode() + b" " + self.VERSION.encode() + b"\r\n" + self.HEADER.str.encode() + b"\r\n" + \
self.BODY
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。