文件
examples
ssl
.gitignore
LICENSE
README.md
chat_record.py
cpm9g_tokenizer.py
database.xlsx
distributor.py
dprint.py
global_var.py
judge_template.py
localhost.pem
mail_dict.py
main.py
make_pic.py
mn_client.py
model.py
rag.py
requirements.txt
scene.py
session_manager.py
sqlite_tool.py
sub_process.py
test.py
克隆/下载
test.py 10.46 KB
一键复制 编辑 原始数据 按行查看 历史
import time
from typing import List
import faiss
import pandas as pd
from auto_gptq import AutoGPTQForCausalLM
from sentence_transformers import SentenceTransformer
import re
import numpy as np
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
import os
from cpm9g_tokenizer import CPM9GTokenizer
cur_dir = os.path.split(os.path.abspath(__file__))[0]
device = "cuda"
class Model:
def __init__(self, name: str):
self.name = name
self.model = None
self.tokenizer = None
self.except_count = 0
def load_model(self) -> None:
pass
def unload_model(self) -> None:
if self.model != None:
print("unload model")
del self.model
self.model = None
def judge(self, input: str) -> str:
return "none"
class MiniCPM3_4BModel(Model):
def __init__(self, name: str):
super().__init__(name)
def process_exception(self) -> str:
self.except_count += 1
if self.except_count < 3:
output = "我出现异常了"
else:
output = "还是有异常, 建议重启服务"
return output
def load_model(self) -> None:
self.unload_model()
print(f"{self.name}模型加载中...")
start_time = time.time()
self.model = AutoModelForCausalLM.from_pretrained(cur_dir + "/MiniCPM3-4B", torch_dtype=torch.bfloat16,
device_map=device, trust_remote_code=True)
self.model.eval()
self.tokenizer = AutoTokenizer.from_pretrained(cur_dir + "/MiniCPM3-4B", trust_remote_code=True)
end_time = time.time()
elapsed_time = np.round(end_time - start_time, decimals=3)
print(f"模型加载时间: {elapsed_time} 秒")
def judge(self, question: str) -> str:
if self.model == None:
self.load_model()
start_time = time.time()
try:
model_inputs = self.tokenizer.apply_chat_template([{"role": "user", "content": question}],
return_tensors="pt", add_generation_prompt=True).to(
device)
model_outputs = self.model.generate(model_inputs, max_new_tokens=1024, top_p=0.7, temperature=0.7)
output_token_ids = [model_outputs[i][len(model_inputs[i]):] for i in range(len(model_inputs))]
output = self.tokenizer.batch_decode(output_token_ids, skip_special_tokens=True)[0]
except:
output = self.process_exception()
end_time = time.time()
elapsed_time = np.round(end_time - start_time, decimals=3)
print(f"模型(minicpm)推理时间: {elapsed_time} 秒")
return output
def inference(self, data: List[str], prompt: str) -> str:
if self.model == None:
self.load_model()
start_time = time.time()
input = prompt + "".join(data) + "<AI>"
text = self.judge(input)
end_time = time.time()
elapsed_time = np.round(end_time - start_time, decimals=3)
output = text.split('<AI>')[-1][:-4]
print(f"模型(ask)推理时间: {elapsed_time} 秒")
print(output)
return output
class CPM9GGPTQQuantModel(Model):
def __init__(self, name: str):
super().__init__(name)
def process_exception(self) -> str:
self.except_count += 1
if self.except_count < 3:
output = "我出现异常了"
else:
output = "还是有异常, 建议重启服务"
return output
def load_model(self) -> None:
self.unload_model()
print(f"{self.name}模型加载中...")
start_time = time.time()
self.model = AutoGPTQForCausalLM.from_quantized(f"{cur_dir}/hf-8b-v2-gptq/4bits-128g-desc_act_True-damp_0.01",
device="cuda:0")
self.model.eval()
self.tokenizer = CPM9GTokenizer(f"{cur_dir}/hf-8b-v2-gptq/4bits-128g-desc_act_True-damp_0.01/vocabs.txt")
end_time = time.time()
elapsed_time = np.round(end_time - start_time, decimals=3)
print(f"模型加载时间: {elapsed_time} 秒")
def judge(self, question: str) -> str:
if self.model == None:
self.load_model()
start_time = time.time()
try:
input_ids = torch.tensor([[self.tokenizer.bos_id] + self.tokenizer.encode(question)[:4096]]).cuda()
output = self.model.generate(inputs=input_ids, max_new_tokens=2048, do_sample=False, num_beams=1)[
0].tolist()
except Exception:
output = self.process_exception()
text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9 ;]', '', self.tokenizer.decode(output).split('<AI>')[-1][:-4])
end_time = time.time()
elapsed_time = np.round(end_time - start_time, decimals=3)
print(f"模型(judge)推理时间: {elapsed_time} 秒")
return text
def inference(self, data: List[str], prompt: str) -> str:
if self.model == None:
self.load_model()
start_time = time.time()
str = "".join(data)
input = prompt + str + "<AI>"
try:
input_ids = torch.tensor([[self.tokenizer.bos_id] + self.tokenizer.encode(input)[:4096]]).cuda()
output = self.model.generate(inputs=input_ids, max_new_tokens=2048, do_sample=False, num_beams=1)[
0].tolist()
except Exception:
output = self.process_exception()
end_time = time.time()
text = self.tokenizer.decode(output)
elapsed_time = np.round(end_time - start_time, decimals=3)
output = text.split('<AI>')[-1][:-4]
print(f"模型(ask)推理时间: {elapsed_time} 秒")
print(output)
return output
class QueryDatabase:
def __init__(self, model):
self.model = model
def query(self, job):
# 1. 读取数据(向量数据库)
df = pd.read_excel('database.xlsx', header=0)
texts = df.iloc[:, 0].values
# 2. 文本向量化
model = SentenceTransformer(f"{cur_dir}/paraphrase-multilingual-MiniLM-L12-v2")
text_vectors = model.encode(texts, convert_to_numpy=True)
d = text_vectors.shape[1] # 向量的维度
# 3. 创建索引
index = faiss.IndexFlatL2(d) # 使用L2距离的平面索引
# 4. 添加数据到索引中
index.add(text_vectors)
# 5. 查询数据
query_vector = model.encode([job], convert_to_numpy=True)
distances, indices = index.search(query_vector, 1)
# 阀值
stresct = 2
temp = 0
# 查看阀值和预值
# print(distances[0][0],stresct)
if distances[0][0] > stresct:
print("未在数据库中匹配该向量!")
if "查找" in job or "文件" in job:
temp = 2
else:
temp = 1
text = texts[indices[0][0]]
database_text = df[df['Key'] == text].iloc[:, :].values.tolist()[0]
directive = database_text[1]
if temp == 1:
return ""
elif temp == 2 or directive.startswith('find'):
def find_parame(question, directive):
judge_directive_template = '''提取"{question}"这句话中的时间参数,如“1天、3天、一周、一个月、半个月、一天、一年、2年、三周”等这一类的时间参数。结果转换成以天为单位的时间,最终返回的格式为'时间参数:###天'。回答:<AI>'''
directive_input = judge_directive_template.format(question=question, directive=directive)
text = self.model.judge(directive_input)
output = re.sub(r'[^0-9]', '', text.split('<AI>')[-1][:-4].split(':')[-1])
print(output, text)
if output == '':
return 3
return output
parame = find_parame(job, directive)
return directive + ' -{}'.format(parame)
return directive
def work_A():
# 创建模型
minicpm3_4b = MiniCPM3_4BModel("MiniCPM3_4B模型微调")
querydatabase = QueryDatabase(minicpm3_4b)
# 单例
question = "如何唱好一首歌"
base = '''{question}'''
simplify_question = minicpm3_4b.judge(base.format(question=question))
print(simplify_question)
complete_directive = querydatabase.query(simplify_question)
if complete_directive:
print("匹配操作指令成功:", complete_directive)
else:
print("未匹配操作指令,继续提问大模型")
ans = minicpm3_4b.judge(base.format(question=question))
print(ans)
def work_B():
cpm8b2gptq = CPM9GGPTQQuantModel("九格8B-GPTQ量化微调")
querydatabase = QueryDatabase(cpm8b2gptq)
question = "开启深色模式"
base = '''<用户>将我的问题,精炼成短句更加的简洁(命令式),保留参数,回答的结果更倾向于‘做’,而不是‘怎么做’‘为什么做’‘如何做’,如果有多个问题任务,则按';'切分输出。
示例如下“ 问题:请将分辨率设置为1920x1080。回答:分辨率设置为1920x1080;
问题:将显示器方向设置为纵向。回答:屏幕方向设置为纵向;
问题:把当前显示器设置为主屏幕。回答:设置主屏幕;
问题:请开启屏幕镜像功能。回答:设置镜像屏幕;
问题:把显示器关闭时间调整到10分钟。回答:关闭显示器时间设置为10分钟;
问题:如何设置按键延迟速度?回答:按键延迟速度设置;
问题:请将按键延迟速度设置为200毫秒。回答:按键延迟速度设置为200毫秒;
问题:把显示内容镜像到另一个屏幕。回答:设置镜像屏幕;
问题:如何调整屏幕刷新率?回答:设置刷新率;
问题:把屏幕缩放到200%。回答:屏幕缩放至200%;
问题:我该怎么设置UKUI休息时间显示?答案:开启UKUI休息时间显示;
问题:截图。答案:截图;”
用户输入:{question}
回答:<AI>'''
simplify_question = cpm8b2gptq.judge(base.format(question=question))
print(simplify_question)
complete_directive = querydatabase.query(simplify_question)
if complete_directive:
print("匹配操作指令成功:", complete_directive)
else:
print("未匹配操作指令,继续提问大模型")
ans = cpm8b2gptq.judge(base.format(question=question))
print(ans)
if __name__ == '__main__':
work_A()
# work_B()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化