加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
tangbaowss.py 4.67 KB
一键复制 编辑 原始数据 按行查看 历史
Xoipz 提交于 2024-09-19 17:25 . 自动提交 (周四 2024-09-19 17-25)
import asyncio
import configparser
import ctypes
import random
import subprocess
import sys
import threading
import time
import websockets
import state
import os
current_file_path = __file__
directory_path = os.path.dirname(current_file_path)
root_path = os.path.join(directory_path, "..")
sys.path.append(root_path)
from lanrenauto.logger_module import logger
#躺宝wss通讯模块
# 存储所有已连接的客户端
connected_clients = []
new_msg = ""
async def handle_client(websocket, path):
global new_msg
try:
# 发送连接成功消息
await websocket.send("连接成功")
# 添加新连接的客户端到集合中
connected_clients.append(websocket)
while True:
# 接收客户端发送的消息
message = await websocket.recv()
new_msg = message
# print(f"收到消息:{message}")
# 处理接收到的消息
if message == "是否回放中 真":
state.状态_是否回放中 = True
elif message == "是否回放中 假":
state.状态_是否回放中 = False
elif message == "是否暂停 真":
state.状态_是否暂停 = True
elif message == "是否暂停 假":
state.状态_是否暂停 = False
elif message == "是否开始录制 假":
state.状态_是否开始录制 = False
elif message == "是否开始录制 真":
if state.状态_是否禁止录制 == False:
logger.info("请按F8结束录制")
state.QT_信号.mysig_tishi.emit(f"请按F8结束录制")
state.状态_是否开始录制 = True
elif message == "是否禁止录制 假":
state.状态_是否禁止录制 = False
elif message == "是否禁止录制 真":
state.状态_是否禁止录制 = True
elif message[:5] == "录制的脚本":
state.录制_脚本文本 = message[6:]
if state.录制_脚本文本 != "":
logger.info("录制完毕!")
# print(message)
finally:
# 客户端断开连接后,将其移出集合
connected_clients.remove(websocket)
async def send_to_client(client_id, message):
if len(connected_clients) > 0:
# 查找指定的客户端
await connected_clients[client_id].send(message)
async def server_main():
try:
if len(state.DUANGKOUHAO) < 3:
state.DUANGKOUHAO = str(int(random.randint(1000, 19999)))
server = await websockets.serve(handle_client, "localhost", int(state.DUANGKOUHAO), max_size=1024 * 1024 * 10)
except:
kernel32 = ctypes.windll.kernel32
kernel32.SetConsoleMode(kernel32.GetStdHandle(-10), 7)
state.DUANGKOUHAO = input(f"端口 {state.DUANGKOUHAO} 被占用了! 请输入新的端口号(然后手动重开):")
if len(state.DUANGKOUHAO) < 3:
state.DUANGKOUHAO = str(int(random.randint(1000, 19999)))
try:
# 创建 ConfigParser 对象
config = configparser.ConfigParser()
# 添加节和键-值对
config['seting'] = {
'DUANGKOUHAO': state.DUANGKOUHAO
}
# 写入配置到 INI 文件
with open("./datas/setting.ini", 'w') as configfile:
config.write(configfile)
except:
pass
sys.exit()
try:
time.sleep(5)
# 启动exe程序
subprocess.Popen('"' + state.LIANZHAOFUWU + '" "' + state.DUANGKOUHAO + '"')
logger.info("躺宝连招插件服务已经启动!")
except:
logger.error("文件没找到可以能被杀毒软件干掉了 " + state.LIANZHAOFUWU)
input("回车结束!", state.LIANZHAOFUWU)
sys.exit()
# 启动事件循环,处理客户端连接
await asyncio.gather(server.wait_closed())
def send_msg(msg="是否回放#@@#假"):
'''
给躺宝发送指令
全部指令 :
是否回放#@@#假 /真
是否暂停#@@#假 /真
是否开始录制#@@#假 /真
是否禁止录制#@@#假 /真
解析脚本#@@#jiaoben" jiaoben就是录制的脚本文本 不是文件 是直接文字
全局hwnd#@@#12345 12345就是游戏的窗口句柄
:param msg: 指令 用 #@@# 分割
:return:
'''
asyncio.run_coroutine_threadsafe(send_to_client(-1, msg), loop)
#开线程启动监听
loop = asyncio.new_event_loop()
# 启动 WebSocket 服务器
t = threading.Thread(target=loop.run_until_complete, args=(server_main(),))
if state.python_var > 9:
t.daemon = True
else:
t.setDaemon(True)
t.start()
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化