加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
ks_old.py 24.18 KB
一键复制 编辑 原始数据 按行查看 历史
import binascii
import json, os
import logging
import random
import re
import time
import websocket
import requests
import _thread
from protobuf_inspector.types import StandardParser
from google.protobuf import json_format
from protobuf_inspector.types import StandardParser
from google.protobuf import json_format
from ks_pb2 import CSWebEnterRoom
from ks_pb2 import CSWebHeartbeat
from ks_pb2 import SocketMessage
from ks_pb2 import SCHeartbeatAck
from ks_pb2 import PayloadType
from ks_pb2 import SCWebFeedPush
from ks_pb2 import SCWebLiveWatchingUsers
from ks_pb2 import SCWebEnterRoomAck
from utils.common import Common
from utils.logger import Configure_logger
from utils.my_handle import My_handle
common = Common()
# 日志文件路径
file_path = "./log/log-" + common.get_bj_time(1) + ".txt"
Configure_logger(file_path)
my_handle = My_handle("config.json")
if my_handle is None:
print("程序初始化失败!")
os._exit(0)
liveRoomId = None
ttwid = None
roomStore = None
liveRoomTitle = None
proxy = None
class Tool:
apiHost = 'https://live.kuaishou.com/live_graphql'
# 进入房间时需要的token
token = ''
# 网页token
cookie = ''
# websocket地址
webSocketUrl = ''
# 房间号
liveRoomId = None
# 直播网页地址
liveUrl = ''
# 公共请求头
headers = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
}
# 初始化
def init(self, liveUrl: str, cookie: str):
self.liveUrl = liveUrl
self.cookie = cookie
self.headers['Referer'] = self.liveUrl
self.headers['cookie'] = self.cookie
# 获取房间号
def getLiveRoomId(self):
liveUrl = self.liveUrl.strip('/')
st = liveUrl.split('/')
st = st[len(st) - 1]
res = requests.get(url=liveUrl, headers=self.headers)
# print(res.text)
userTag = '$ROOT_QUERY.webLiveDetail({\"authToken\":\"\",\"principalId\":\"' + st + '\"})'
ss = re.search(
r'_STATE__=(.*?);\(function\(\)\{var s;\(s=document\.currentScript\|\|document\.scripts\[document\.scripts\.length-1]\)\.parentNode\.r',
res.text)
text = ss.group(1)
text = json.loads(text)
self.liveRoomId = text['liveroom']['liveStream']['id']
if self.liveRoomId == '':
raise RuntimeError('liveRoomId获取失败')
return self.liveRoomId
# 获取直播websocket信息
def getWebSocketInfo(self, liveRoomId):
res = requests.get("https://live.kuaishou.com/live_api/liveroom/websocketinfo?liveStreamId="+liveRoomId, headers=self.headers).json()
return res
# 启动websocket服务
def wssServerStart(self):
rid = self.getLiveRoomId()
wssInfo = self.getWebSocketInfo(rid)
self.token = wssInfo['data']['token']
self.webSocketUrl = wssInfo['data']['websocketUrls'][0]
websocket.enableTrace(False)
# 创建一个长连接
ws = websocket.WebSocketApp(
self.webSocketUrl, on_message=self.onMessage, on_error=self.onError, on_close=self.onClose,
on_open=self.onOpen
)
ws.run_forever()
def onMessage(self, ws: websocket.WebSocketApp, message: bytes):
wssPackage = SocketMessage()
wssPackage.ParseFromString(message)
if wssPackage.payloadType == PayloadType.SC_ENTER_ROOM_ACK:
self.parseEnterRoomAckPack(wssPackage.payload)
return
if wssPackage.payloadType == PayloadType.SC_HEARTBEAT_ACK:
self.parseHeartBeatPack(wssPackage.payload)
return
if wssPackage.payloadType == PayloadType.SC_FEED_PUSH:
datas = self.parseFeedPushPack(wssPackage.payload)
if "commentFeeds" not in datas:
return
for data in datas['commentFeeds']:
content = data['content']
user_name = data['user']['userName']
print(f"[{user_name}]: {content}")
data = {
"username": user_name,
"content": content
}
my_handle.process_data(data, "comment")
return
if wssPackage.payloadType == PayloadType.SC_LIVE_WATCHING_LIST:
self.parseSCWebLiveWatchingUsers(wssPackage.payload)
return
data = json_format.MessageToDict(wssPackage, preserving_proto_field_name=True)
log = json.dumps(data, ensure_ascii=False)
logging.warn('[onMessage] [无法解析的数据包⚠️]' + log)
def parseEnterRoomAckPack(self, message: bytes):
scWebEnterRoomAck = SCWebEnterRoomAck()
scWebEnterRoomAck.ParseFromString(message)
data = json_format.MessageToDict(scWebEnterRoomAck, preserving_proto_field_name=True)
log = json.dumps(data, ensure_ascii=False)
logging.info('[parseEnterRoomAckPack] [进入房间成功ACK应答👌] [RoomId:' + self.liveRoomId + '] | ' + log)
return data
# 进入直播间的用户
def parseSCWebLiveWatchingUsers(self, message: bytes):
scWebLiveWatchingUsers = SCWebLiveWatchingUsers()
scWebLiveWatchingUsers.ParseFromString(message)
data = json_format.MessageToDict(scWebLiveWatchingUsers, preserving_proto_field_name=True)
log = json.dumps(data, ensure_ascii=False)
logging.info('[parseSCWebLiveWatchingUsers] [不知道是啥的数据包🤷] [RoomId:' + self.liveRoomId + '] | ' + log)
return data
# 直播间弹幕信息
def parseFeedPushPack(self, message: bytes):
scWebFeedPush = SCWebFeedPush()
scWebFeedPush.ParseFromString(message)
data = json_format.MessageToDict(scWebFeedPush, preserving_proto_field_name=True)
log = json.dumps(data, ensure_ascii=False)
logging.info('[parseFeedPushPack] [直播间弹幕🐎消息] [RoomId:' + self.liveRoomId + '] | ' + log)
return data
def parseHeartBeatPack(self, message: bytes):
heartAckMsg = SCHeartbeatAck()
heartAckMsg.ParseFromString(message)
data = json_format.MessageToDict(heartAckMsg, preserving_proto_field_name=True)
log = json.dumps(data, ensure_ascii=False)
logging.info('[parseHeartBeatPack] [心跳❤️响应] [RoomId:' + self.liveRoomId + '] | ' + log)
return data
def onError(self, ws, error):
logging.error('[Error] [websocket异常]')
def onClose(self, ws):
logging.info('[Close] [websocket已关闭]')
def onOpen(self, ws):
data = self.connectData()
logging.info('[onOpen] [建立wss连接]')
ws.send(data, websocket.ABNF.OPCODE_BINARY)
_thread.start_new_thread(self.keepHeartBeat, (ws,))
def connectData(self):
obj = CSWebEnterRoom()
obj.payloadType = 200
obj.payload.token = self.token
obj.payload.liveStreamId = self.liveRoomId
obj.payload.pageId = self.getPageId() # pageId
data = obj.SerializeToString() # 序列化成二进制字符串
return data
# 封装心跳包
def heartbeatData(self):
obj = CSWebHeartbeat()
obj.payloadType = 1
obj.payload.timestamp = int(time.time() * 1000)
return obj.SerializeToString()
# 发送心跳包
def keepHeartBeat(self, ws: websocket.WebSocketApp):
while True:
# 20秒发一次心跳包
time.sleep(20)
payload = self.heartbeatData()
logging.info("[keepHeartBeat] [发送心跳]")
ws.send(payload, websocket.ABNF.OPCODE_BINARY)
def getPageId(self):
# js 中获取到该值的组成字符串
charset = "-_zyxwvutsrqponmlkjihgfedcba9876543210ZYXWVUTSRQPONMLKJIHGFEDCBA"
pageId = ""
for _ in range(0, 16):
pageId += random.choice(charset)
pageId += "_"
pageId += str(int(time.time() * 1000))
return pageId
# content内容 liveStreamId房间号 color字幕颜色
def sendMsg(self, content: str, liveStreamId=None, color=None):
variables = {
'color': color,
'content': content,
'liveStreamId': liveStreamId
}
query = 'mutation SendLiveComment($liveStreamId: String, $content: String, $color: String) {\n sendLiveComment(liveStreamId: $liveStreamId, content: $content, color: $color) {\n result\n __typename\n }\n}\n'
return self.liveGraphql('SendLiveComment', variables, query)
# 关注用户 principalId用户ID type 1关注 2取消关注
def follow(self, principalId=None, type=1):
variables = {
'principalId': principalId,
'type': type,
}
query = 'mutation UserFollow($principalId: String, $type: Int) {\n webFollow(principalId: $principalId, type: $type) {\n followStatus\n __typename\n }\n}\n'
return self.liveGraphql('UserFollow', variables, query)
# 获取用户基本信息 principalId用户ID
def getUserCardInfoById(self, principalId):
variables = {
'principalId': principalId,
'count': 3,
}
query = 'query UserCardInfoById($principalId: String, $count: Int) {\n userCardInfo(principalId: $principalId, count: $count) {\n id\n originUserId\n avatar\n name\n description\n sex\n constellation\n cityName\n followStatus\n privacy\n feeds {\n eid\n photoId\n thumbnailUrl\n timestamp\n __typename\n }\n counts {\n fan\n follow\n photo\n __typename\n }\n __typename\n }\n}\n'
return self.liveGraphql('UserCardInfoById', variables, query)
# 获取所有礼物信息
def getAllGifts(self):
variables = {}
query = 'query AllGifts {\n allGifts\n}\n'
return self.liveGraphql('AllGifts', variables, query)
# 底层统一请求方法
def liveGraphql(self, operationName: str, variables, query, headers=None):
if headers is None:
head = self.headers
head['content-type'] = 'application/json'
else:
head = headers
data = {
'operationName': operationName,
'variables': variables,
'query': query
}
res = requests.post(url=self.apiHost, data=json.dumps(data), headers=head).json()
logging.info('[liveGraphql] [操作返回数据] | ' + json.dumps(res, ensure_ascii=False))
return res
# 十六进制字符串转protobuf格式 (用于快手网页websocket调试分析包体结构)
def hexStrToProtobuf(self, hexStr):
# 示例数据
# hexStr = ''
with open('t-proto', 'wb') as w:
w.write(binascii.unhexlify(hexStr))
parser = StandardParser()
with open('t-proto', 'rb') as fh:
output = parser.parse_message(fh, 'message')
print(output)
return output
# 把十六进制字符串转成ascii编码格式 (用于快手网页websocket调试分析包体结构)
def unHexLify(self, data: str):
# 示例数据
# data = 'E5 8C 97 E6 99 A8 E7 9A 84 E4 BF A1 E6 99 BA EF BC 88 E5 B7 B2 E7 B4 AB'
data.replace(' ', '')
data = binascii.unhexlify(data).decode()
print(data)
return data
try:
ks = Tool()
# ss='08a04e10828298ced230180520012a1b0a07582d426f67757312105273693578484a6f7859364a2f7636543a02706242a808086410a04e1a0c302e372e332d626574612e342200280330003a0e666562376331343a6d617374657242f102a206ed020a1d303a313a3130333437313539333530383a313033343731353933353038100118a48481d688ddbbbb6322407b226d656e74696f6e5f7573657273223a5b5d2c2261776554797065223a3730302c227269636854657874496e666f73223a5b5d2c2274657874223a2233227d2a150a11733a6d656e74696f6e65645f757365727312002a3b0a13733a636c69656e745f6d6573736167655f6964122465653931663962312d396165662d346566612d623631622d61336230373266383933373330073a81013248626877434843544d544d6c37366372416a6b57784f315a4749686f5a6341646e6337316567324a3268496746504454324e454c4f62354b39477446476a656a496b716657494e564b4458716d56334d4974337335766a554c5a74596d305246304551487a6261627a795548435643503770674a6a6c7a79484c6776364c6d69422465653931663962312d396165662d346566612d623631622d6133623037326638393337334a01305a09646f7579696e5f70637a130a0b73657373696f6e5f6169641204363338337a100a0b73657373696f6e5f6469641201307a150a086170705f6e616d651209646f7579696e5f70637a150a0f7072696f726974795f726567696f6e1202636e7a83010a0a757365725f6167656e7412754d6f7a696c6c612f352e3020284d6163696e746f73683b20496e74656c204d6163204f5320582031305f31355f3729204170706c655765624b69742f3533372e333620284b48544d4c2c206c696b65204765636b6f29204368726f6d652f3130382e302e302e30205361666172692f3533372e33367a160a0e636f6f6b69655f656e61626c65641204747275657a160a1062726f777365725f6c616e677561676512027a687a1c0a1062726f777365725f706c6174666f726d12084d6163496e74656c7a170a0c62726f777365725f6e616d6512074d6f7a696c6c617a80010a0f62726f777365725f76657273696f6e126d352e3020284d6163696e746f73683b20496e74656c204d6163204f5320582031305f31355f3729204170706c655765624b69742f3533372e333620284b48544d4c2c206c696b65204765636b6f29204368726f6d652f3130382e302e302e30205361666172692f3533372e33367a160a0e62726f777365725f6f6e6c696e651204747275657a140a0c73637265656e5f77696474681204313434307a140a0d73637265656e5f68656967687412033930307a0b0a077265666572657212007a1e0a0d74696d657a6f6e655f6e616d65120d417369612f5368616e676861697a0d0a0864657669636549641201307a1c0a057765626964121337313339333931353538393134333933363132900101aa010a646f7579696e5f776562b201077765625f73646b'
# ks.hexStrToProtobuf(ss)
c ='clientid=3; didv=1675056349580; userId=845495460; kuaishou.live.bfb1s=7206d814e5c089a58c910ed8bf52ace5; did=web_a41c846314016ca1a260444bb3c7d66c; client_key=65890b29; kpn=GAME_ZONE; _did=web_117262730815E9F7; kuaishou.live.web_st=ChRrdWFpc2hvdS5saXZlLndlYi5zdBKgAQkoEnsRiD0ovFwIQ828tvYMhmH6rThiUxM-uuTQtXKjmQEry1dCvI5sEsH9SZt9LNWcvJ_kNRPH2AFvS1awpa65z-Jpe3p2nbMvkpraiJV0WkJrvhLrCyb_CTCNPBGoYwUBaDoabrmZLqLJX-txGbrmUDIblQmR-MKwbPb7uQ5MszR2O3jaon_MtIrqnQA7e0IOBVmJT8N_p-lsiclN4NsaEsa__TMaP0jJgfAfW0kccZcKPyIgmgfFxb6YcCH2fKNK5CO2G4OWyK-WxFeXx6Bx8LA1FGcoBTAB; kuaishou.live.web_ph=8d652450751eaf1d2b61edf08b812bb0a41a; userId=845495460; ksliveShowClipTip=true'
room_id = my_handle.get_room_id()
url = 'https://live.kuaishou.com/u/' + room_id
print(f"监听直播间:{url}")
# ks.init(url, c)
# ks.init('https://live.kuaishou.com/u/3x7cff8hm8b9uwi', c)
# 启动快手ws客户端
ks.wssServerStart()
except KeyboardInterrupt:
print('程序被强行退出')
finally:
print('关闭连接...可能是直播已经结束或网络问题可能是直播间不存在或下播或网络问题')
os._exit(0)
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化