代码拉取完成,页面将自动刷新
import json
from chat_record import ChatRecord
from model import Model, CPM9GGPTQQuantModel, MiniCPM3_4BModel, QueryDatabase
from scene import Scene, TaskScene, TranslateScene, Workword_Scene
from session_manager import session_manager
from typing import List, Dict
from rag import Rag
import locale
import os
import time
import numpy as np
cur_dir = os.path.split(os.path.abspath(__file__))[0]
mn_MN = "mn_MN"
def contains_mongolian(text):
# 定义蒙古语字符的Unicode范围
mongolian_ranges = [
(0x1800, 0x18AF), # 蒙古文
(0x18B0, 0x18FF) # 扩展蒙古文
]
if "ᠭᠣᠶᠣ ᠭᠠᠵᠠᠷ ᠬᠠᠮᠢᠭᠠ ᠪᠠᠢᠬᠤ " in text:
return True
# for char in text:
# code_point = ord(char)
# for start, end in mongolian_ranges:
# if start <= code_point <= end:
# return True
return False
class SessionVar:
def __init__(self, id: int, name:str, model: str, scene: str, context: bool, rag: bool):
self.session_id = id
self.session_name = name
self.session_model = model
self.session_scene = scene
self.session_context = context
self.session_rag = rag
self.chat_record = ChatRecord()
class GlobalVar:
def __init__(self):
self.settings = {}
self.is_wss = False
self.session_id = 0
self.scenes = {}
self.models = {}
self.sessions = {}
self.rag = Rag()
self.__load_setting()
self.__load_models()
self.__load_scenes()
self.load_sessions()
self.cur_file = ""
self.make_pic = ""
self.last_content = ""
self.input = ""
self.lang = "zh_CN"
self.next = ""
def unload_model(self) -> None:
print(self.models)
for name,model in self.models.items():
model.unload_model()
def is_exist_session(self, session_id: int) -> bool:
return True if session_id in self.sessions else False
#read scene
def is_exist_scene(self, scene_name: str) -> bool:
return True if scene_name in self.scenes else False
def scene_name(self, session_id: int) -> str:
session: SessionVar = self.sessions[session_id]
return session.session_scene
def scene_content_with_question(self, session_id: int, question: str, is_rag: bool) -> str:
session: SessionVar = self.sessions[session_id]
scene_name = session.session_scene
scene: Scene = self.scenes.get(scene_name, None)
return scene.scene_content_with_question(question, is_rag) if scene != None else question
def scene_prompt_with_question(self, session_id: int, question: str) -> str:
session: SessionVar = self.sessions[session_id]
scene_name = session.session_scene
scene: Scene = self.scenes.get(scene_name, None)
template = scene.prompt if scene != None else ""
if "{context}" in template:
relevant_content = self.rag.rag_cotent_with_question(question)
final_prompt = template.replace("{context}", relevant_content)
return final_prompt
else:
return template
def __scene_context(self, session_id: int) -> bool:
session: SessionVar = self.sessions[session_id]
scene_name = session.session_scene
scene: Scene = self.scenes.get(scene_name, None)
return scene.is_context() if scene != None else False
def all_scene_infos(self) -> List[Dict[str, str]]:
scenes = []
for scene in self.scenes.values():
if isinstance(scene, Scene):
dict = {
"name": scene.name,
"description": scene.description,
}
scenes.append(dict)
return scenes
#read model
def is_exist_model(self, model_name: str) -> bool:
return True if model_name in self.models else False
def all_model_names(self) -> List[str]:
return list(self.models.keys())
#read context
def is_context(self, session_id: int) -> bool:
session: SessionVar = self.sessions[session_id]
return session.session_context and self.__scene_context(session_id)
def context(self, session_id: int) -> List[str]:
session: SessionVar = self.sessions[session_id]
chat_record: ChatRecord = session.chat_record
return chat_record.get_records() if chat_record != None else []
#read rag
def is_rag(self, session_id: int) -> bool:
session: SessionVar = self.sessions[session_id]
return session.session_rag
def load_rag(self) -> None:
self.rag.load_rag()
def clear_rag(self) -> None:
self.rag.clear_rag()
#write
def insert_record(self, session_id: int, content: str) -> None:
session: SessionVar = self.sessions[session_id]
chat_record: ChatRecord = session.chat_record
chat_record.insert_record(content)
#action
def wps_inference(self, question: str) -> str:
model: Model = self.models.get("九格8B-GPTQ量化", None)
return model.wps_inference(question) if model != None else "大语言模型找不到了!"
def inference(self, session_id: int, data: List[str], prompt: str) -> str:
session: SessionVar = self.sessions[session_id]
model_name: str = session.session_model
model: Model = self.models.get(model_name, None)
return model.inference(data, prompt) if model != None else "大语言模型找不到了!"
#juege
def judge(self, session_id: int, question: str) -> str:
session: SessionVar = self.sessions[session_id]
model_name: str = session.session_model
model: Model = self.models.get(model_name, None)
# 简化问题
judge_str = model.judge(question)
# 匹配指令
start_time = time.time()
judge_str = QueryDatabase(model).query(judge_str)
end_time = time.time()
elapsed_time = np.round(end_time - start_time, decimals=3)
print(f"向量数据库匹配时间: {elapsed_time} 秒")
return judge_str
def load_sessions(self) -> None:
rows = session_manager.session_list()
for row in rows:
self.sessions[row[0]] = SessionVar(row[0], row[1], row[2], row[3], True if row[4] == 1 else False, True if row[5] == 1 else False)
def __load_scenes(self) -> None:
commonPrompt = '''你是openKylin操作系统的语音助手小麒,具备通用的知识问答能力,还具备特点领域知识、本地知识检索增强、系统交互、语音交互、指令执行等能力,介绍你自己的时候请简明扼要。
跟大家再见时请回答:好的,再见!感谢大家的参与,希望我们下次再见面时能分享更多的收获和喜悦,祝大家有美好的一天!
如果有上下文,请基于上下文回答,上下文:{context}'''
self.scenes["通用"] = Scene("通用", "通用场景", "", commonPrompt)
self.scenes["英语翻译"] = TranslateScene("英语翻译", "将任意语言翻译为英语", "请将以上内容翻译为英文", "你是一位英语专家,会将用户说的每一句话,无论什么语言,都会用专业的地道的英语表达出来。")
# taskPrompt = '''你是openKylin操作系统的AI助手小麒,具备通用的知识问答能力,还具备特点领域知识、本地知识检索增强、系统交互、语音交互、指令执行等能力。如果有上下文,请基于上下文回答,上下文:
# 设置浅色模式:"set light"好的,已经设置为浅色模式
# 打开浏览器:"open browser"好的,正在打开浏览器
# 打开邮箱:"open mail"好的,正在打开邮箱
# 打开文件管理器:"open fileManager"好的,正在打开文件管理器
# 打开www.163.com:"open www.163.com"好的,正在打开网页www.163.com
# 打开蓝牙:"open bluetooth"好的,蓝牙已打开
# 关闭蓝牙:"close bluetooth"好的,蓝牙已关闭
# 声音调大:"up volume"好的,正在调大声音
# 声音调小:"down volume"好的,正在调小声音
# 声音调到最大:"top volume"好的,正在调大声音
# 声音调到最小:"bottom volume"好的,正在调大声音
# 静音:"close volume"好的,已设置静音
# 将屏幕调亮:"up brightness"好的,正在将屏幕调亮
# 将屏幕调暗:"down brightness"好的,正在将屏幕调暗
# 屏幕调到最亮:"top brightness"好的,正在将屏幕调到最亮
# 屏幕调到最暗:"bottom brightness"好的,正在将屏幕调到最亮
# 等待10秒:"wait 10"等待10秒
# 等待16秒:"wait 16"等待10秒
# 等待1分钟:"wait 60"等待1分钟
# 等待1小时:"wait 3600"等待1小时
# 重启:"reboot"正在重启
# 关机:"shutdown"正在关机
# 睡眠:"sleep"准备进入睡眠
# 截屏:"screenshot"好的,正在截屏
# 播放:"play"播放
# 暂停:"pause"暂停
# 停止:"pause"停止
# 播放上一首:"play previous"播放上一首
# 播放下一首:"play next"播放下一首'''
# self.scenes["任务执行"] = TaskScene("任务执行", "执行用户指定的任务", "", taskPrompt)
self.scenes["中文翻译"] = TranslateScene("中文翻译", "将任意语言翻译为中文", "请将以上文本翻译为中文", "你是一位汉语专家,会将用户说的每一句话,无论什么语言,都会用专业的地道的汉语表达出来")
self.scenes["语法检查"] = Scene("语法检查", "检查文本中是否存在语法等错误", "请检查以上文本语句是否通顺,是否存在语法等错误", "你是一位语言专家,会检查用户说的每一句话,语句是否通顺,是否存在语法等错误,指出来并提出改进")
self.scenes["文本扩充"] = Scene("文本扩充", "扩充文本内容", "请补充丰富以上内容,并完善", "你是一位语言专家,针对用户说的内容,补充、丰富并完善")
self.scenes["摘要生成"] = Scene("摘要生成", "基于内容提取摘要", "请根据以上内容生成摘要", "你是一位语言专家,针对用户说的内容,生产摘要")
self.scenes["生成周报"] = Workword_Scene("生成周报", "基于内容生成周报",'''
请帮我根据内容生成特定格式周报,周报格式如下:
日期-主题
1,
2,
内容:【】
''',"你是一位语言专家,针对用户说的内容,生成高质量周报")
self.scenes["意图识别"] = Scene("意图识别", "基于内容意图识别", "识别用户所表达的意图", '''你是语言专家,能根据内容判定其意图,意图可以分类为信息寻求、问题解决、社交互动、情感支持、建议与推荐、意见表达、学习与成长、交易与购买、翻译、操作请求、任务请求等,内容:【】''')
def __load_models(self) -> None:
self.models["九格8B-GPTQ量化"] = CPM9GGPTQQuantModel("九格8B-GPTQ量化")
self.models['MiniCPM'] = MiniCPM3_4BModel("MiniCPM量化")
def load_model(self) -> None:
lang_code, encoding = locale.getdefaultlocale()
if lang_code == mn_MN or self.lang == mn_MN:
model: Model = self.models.get("九格蒙语", None)
model.load_model()
else:
session: SessionVar = self.sessions[self.session_id]
model_name: str = session.session_model
model: Model = self.models.get(model_name, None)
model.load_model()
def __load_setting(self) -> None:
# 先读取设置
try:
# 尝试打开并读取 settings.json 文件
with open(f'{cur_dir}/settings.json', 'r') as f:
self.settings = json.load(f)
except FileNotFoundError:
# 如果文件不存在,使用默认设置
print("settings.json不存在")
self.is_wss = True if self.settings.get('is_wss', 'false') == "yes" else False
self.session_id = int(self.settings.get('session_id', '0'))
def update_setting(self) -> None:
# 更新设置
self.settings['is_wss'] = "yes" if self.is_wss else "no"
self.settings['session_id'] = self.session_id
# 写回文件
with open(f'{cur_dir}/settings.json', 'w') as f:
json.dump(self.settings, f)
gvar = GlobalVar()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。