加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
make_pic.py 9.97 KB
一键复制 编辑 原始数据 按行查看 历史
lihanhua 提交于 2024-08-26 15:02 . first commit
import requests
import json
import re
import os
import shutil
from urllib.parse import urlencode
from time import sleep
import sys
import os
import glob
import random
import time
import numpy as np
WX_API_KEY = "7bDUHmBP61IbxEqbUXbCogCo"
WX_SECRET_KEY = "ERxypcJCxFOQaLSyFvuhUkfyg8aWoEYM"
GEN_PIC_API_KEY = "KO02f1J3DosulOzFWQGZjVWF"
GEN_PIC_SECRET_KEY = "UFyAATSo4K6wpkm72QMSReNlHU3pSvIu"
GEN_PIC_HOST = "https://aip.baidubce.com/rpc/2.0/wenxin/v1/basic/"
home_dir = os.path.expanduser("~")
pic_path = os.path.join(home_dir, "图片/temp.png")
def get_access_token(api_key,secret_key):
url = "https://aip.baidubce.com/oauth/2.0/token"
p = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key}
return str(requests.post(url, params=p).json().get("access_token"))
def assemble_ws_auth_url(requset_url, method="GET", access_token=""):
values = {
"access_token": access_token,
}
return requset_url + "?" + urlencode(values)
def getRequestContent(text, resolution, **kwargs):
body = {
"text": text,
"resolution": resolution
}
for key, value in kwargs.items():
body[key] = value
return body
def getResponeContent(task_id):
body = {
"taskId": task_id
}
return body
def send_request(host, content, access_token):
url = assemble_ws_auth_url(host, "POST", access_token)
response = requests.post(url,json=content,headers={'content-type': "application/json", 'accept': 'application/json'}).text
return response
def picture_generation(params):
params = json.loads(params)
style = params["style"]
text = params["text"]
resolution = params["resolution"]
access_token = get_access_token(GEN_PIC_API_KEY, GEN_PIC_SECRET_KEY)
otherReq = {
"style": style
}
task = send_request(
GEN_PIC_HOST + "textToImage",
getRequestContent(text, resolution,**otherReq),
access_token)
task = json.loads(task)
if "data" in task:
time = 0
while True:
response = send_request(
GEN_PIC_HOST + "getImg",
getResponeContent(task["data"]["taskId"]),
access_token)
response = json.loads(response)
if "data" in response:
if response["data"]["status"] == 0:
sleep(1)
time += 1
elif response["data"]["status"] == 1:
imageName = "wenxin-" + str(response["data"]["taskId"])
savePath = f"images/{imageName}.jpg"
result = json.dumps({
"picture_url": response["data"]["img"]
})
return result
elif "error_code" in response:
break
def download(params):
global pic_path
params = json.loads(params)
picture_url = params['picture_url']
if picture_url.startswith("http://") or picture_url.startswith("https://"):
dir = params['dir']
savePath = dir + picture_url.split('/')[-1].split('?')[0]
res = requests.get(picture_url)
with open(savePath, "wb") as f:
f.write(res.content)
result = json.dumps({
"picture_location": savePath
})
pic_path = savePath
print("pic save: " + savePath)
return result
def match_markdown_code_blocks(text):
code_block_regex = re.compile(r'```(\w+)?\s([\s\S]+?)\s```', re.DOTALL)
code_blocks = code_block_regex.findall(text)
texts = re.sub(code_block_regex, '', text).split('\n')
texts = list(filter(None, texts))
return code_blocks, texts
def func_data_filter(main_code_block):
function = ""
if 'function' in main_code_block:
function = main_code_block['function']
elif '函数名' in main_code_block:
function = main_code_block['函数名']
parameters = dict()
if 'parameters' in main_code_block:
parameters = main_code_block['parameters']
elif '参数' in main_code_block:
parameters = main_code_block['参数']
result = ""
if 'return' in main_code_block:
result = main_code_block['return']
elif '返回' in main_code_block:
result = main_code_block['返回']
return function, parameters, result
def add_params(stdout, code_blocks, i):
main_return = json.loads(stdout)
next_code_block = json.loads(code_blocks[i+1])
for key, value in main_return.items():
if 'parameters' in next_code_block:
next_code_block['parameters'][key] = value
elif '参数' in next_code_block:
next_code_block['参数'][key] = value
code_blocks[i+1] = json.dumps(next_code_block)
def function_call(i, code_blocks):
main_code_block = json.loads(code_blocks[i])
function, parameters, return_name = func_data_filter(main_code_block)
print("Calling function: " + function)
# process = subprocess.Popen(['python', function + '.py', json.dumps(parameters)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
# stdout, stderr = process.communicate()
# if stderr:
# print(f"Error: {stderr}")
# return stderr
stdout = eval(function + "(\'" + json.dumps(parameters) + "\')")
if i == len(code_blocks) - 1:
return
add_params(stdout, code_blocks, i)
def main_call(code_blocks, texts):
i = 0
while (i < len(code_blocks)):
print(texts[i])
stderr = function_call(i, code_blocks)
if stderr:
print(f"Error: {stderr}")
return
i += 1
while (i < len(texts)):
print(texts[i])
i += 1
def get_wx_access_token():
"""
使用 API Key,Secret Key 获取access_token,替换下列示例中的应用API Key、应用Secret Key
"""
url = "https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=" + WX_API_KEY + "&client_secret=" + WX_SECRET_KEY
payload = json.dumps("")
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
return response.json().get("access_token")
# 1280*720
def wenxin(command):
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-latest?access_token=" + get_wx_access_token()
home_dir = os.path.expanduser('~')
pic_dir = os.path.join(home_dir, "图片/")
prompt = f'''能力与角色:
你现在是一个计算机运维工程师。
背景信息:
操作系统:
1、发行版:银河麒麟V10;
2、桌面环境:Mate。
可供操作的本地地址:{pic_dir}
可用的API:
1、图片生成:百度智能创作平台API(函数:picture_generation;参数:text,resolution(1280*720),style;返回:picture_url)
2、下载:(函数:download;参数:picture_url,dir;返回:picture_location)
用户会提供自然语言指令:
{{command}}
指令:
将用户提供的自然语言指令进行任务分解。
输出风格:
若干条言简意赅的步骤。
输出范围:
1、确保每个步骤对应一Linux指令或调用一次对应的API;
2、若某步骤为调用上述可用的API,则在该步骤后的下一行以json格式给出函数名以及参数名与参数值和返回。
3、任务分解生成一级步骤即可,不需要二级步骤;
4、步骤不能重复;
5、除了步骤,不需要其他任何文字说明、提示、注意事项的生成。
6、以纯文本形式输出'''
content = prompt.replace("{command}", command, 1)
payload = json.dumps({
"messages": [
{
"role": "user",
"content": content
}
]
})
headers = {
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
text = json.loads(response.text)
return text["result"]
def is_connected():
try:
response = requests.get("https://www.www.baidu.com", timeout=1)
return response.status_code == 200
except requests.ConnectionError:
return False
# 使用函数检查网络连接
def make_pic(command: str) -> str:
# if is_connected() == False:
# print("网络连接不正常。")
# return
instruction = wenxin(command)
print(instruction)
# 按步骤顺序匹配函数数据
code_lang_blocks, texts = match_markdown_code_blocks(instruction)
code_blocks = [code_block[1] for code_block in code_lang_blocks]
# 按步骤顺序运行函数
main_call(code_blocks, texts)
return pic_path
def get_random_final_png_file(directory):
# 构造搜索路径
search_path = os.path.join(directory, '*final.png')
# 使用 glob 模块查找所有匹配的文件
files = glob.glob(search_path, recursive=False)
if not files:
return None # 如果没有找到任何文件,返回 None
# 从匹配的文件中随机选择一个
random_file = random.choice(files)
return random_file
# def make_pic(command: str) -> str:
# global pic_path
# # 示例用法
# directory_path = '~/图片'
# random_final_png_file = get_random_final_png_file(directory_path)
# if random_final_png_file:
# pic_path = random_final_png_file
# start_time = time.time()
# wait_time = random.randint(28000, 39000)
# time.sleep(wait_time / 1000.0)
# end_time = time.time()
# elapsed_time = end_time - start_time
# elapsed_time = np.round(elapsed_time, decimals=3)
# print(f"图片生成时间: {elapsed_time} 秒")
# return pic_path
# make_pic("生成一张大山飞机白云的美景图片")
if __name__ == "__main__":
# command = sys.argv[1] # 获取第一个参数
# make_pic(command)
make_pic("请帮我生成一张高山流水的美景图片")
home_dir = os.path.expanduser("~")
des_path = os.path.join(home_dir, ".kylin-actuator/temp.png")
if pic_path != None:
shutil.copy2(pic_path, des_path)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化