代码拉取完成,页面将自动刷新
# coding:utf-8
# 百度智能云语音批量识别m4a转txt(也可扩展其他音频格式,增加一两句代码即可)
# 作者:雪山凌狐
# 版本:1.1
# 更新时间:2020-02-15
# 网址:http://www.xueshanlinghu.com
# 使用说明请见README.md文件
import subprocess
import json
import os
import time
from decimal import Decimal
from aip import AipSpeech
# 配置部分=========================================
# 将你要识别的m4a文件(或其他你测试后支持的音频格式)放到本程序目录下的设定文件夹下,待识别。你也可以自己修改封装更通用的版本(我这里是为了简便)
asr_folder_name = "audio"
# 结束后是否删除pcm识别文件?推荐删除,否则可能会很多,下次可以再生成嘛
DELETE_PCM = True
""" 你的 APPID AK SK """
APP_ID = '你的 App ID'
API_KEY = '你的 Api Key'
SECRET_KEY = '你的 Secret Key'
# QPS(每秒请求数)限制,免费版目前是5。如果你是无限的QPS,可以把这个值调得非常大
QPS = 5
# 配置部分=========================================
client = AipSpeech(APP_ID, API_KEY, SECRET_KEY)
def get_piece(duration, piece_len=59):
"""获取总的切片长度,每片的长度为piece_len"""
pieces = duration // piece_len + 1
return int(round(pieces, 0))
# 读取文件
def get_file_content(filePath):
"""读取文件,通过二进制的方式读入"""
with open(filePath, 'rb') as fp:
return fp.read()
# 如果你测试能够支持转换mp3,wav,amr等格式,可以在下面的定义中添加即可(我没测试过,不确定),如ext=["m4a", "mp3", "wav", "amr"]
def get_filelist(folder_name, ext=["m4a"]):
"""遍历限定的文件夹下的所有文件完整路径"""
if not(folder_name.endswith("\\")):
folder_name += "\\"
final_filelist = []
filelist = os.listdir(folder_name)
# print(filelist)
for i in filelist:
path = folder_name + i
if os.path.isfile(path):
if os.path.splitext(i)[1].replace(".", "") in ext:
final_filelist.append(path)
elif os.path.isdir(path):
# 进行子目录遍历,递归
child_filelist = get_filelist(path, ext)
final_filelist += child_filelist
return final_filelist
def get_process(process_num, total_num, percent_sign="%"):
"""获取百分比进度"""
def remove_exponent(num, ndigits=2):
"""如果数字结尾有多余的0,把多余的0去掉,方法摘自网络,做了部分修改改进"""
num = Decimal(num)
return round(num.to_integral(), ndigits) if num == num.to_integral() else round(num.normalize(), ndigits)
process = round(process_num / total_num * 100, 2)
process = remove_exponent(process)
return str(process) + percent_sign
def get_process_sign(process_num, total_num, sign="="):
"""获取百分比进度对应的进度条"""
process = get_process(process_num, total_num, percent_sign="")
process = int(round(float(process), 0))
process = process // 2
return sign * process + ">"
if __name__ == "__main__":
# 获取项目开始时间
start_time = time.time()
project_path = os.path.dirname(os.path.abspath(__name__)) + "\\" + asr_folder_name
# print(project_path)
filenames = get_filelist(project_path)
# print(filenames)
file_len = len(filenames) # 有几个文件要识别的
finished_count = 0 # 完成了几个文件初始化
QPS_count = 0 # 目前的请求数初始化
QPS_start_time = time.time() # 目前开始请求的时间,初始化
for cur_file in filenames:
# 获取文件路径、文件名等信息
cur_file_folder = os.path.dirname(cur_file) + "\\"
cur_file_name = os.path.basename(cur_file)
cur_file_name_withoutext = os.path.splitext(cur_file_name)[0]
# print(cur_file_folder, cur_file_name, cur_file_name_withoutext)
# 获取音频时长信息duration
cmd = 'ffprobe -v quiet -print_format json -show_streams "%s"' % cur_file
sub = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
# print(cmd)
out, err = sub.communicate()
sub.kill()
del sub
# print(out)
out = json.loads(out)
duration = out.get("streams")[0].get("duration")
print(cur_file, "持续时间为:", duration, "秒")
# 获取切片数
pieces = get_piece(float(duration))
print("切片数:", pieces)
# 进行PCM格式转换。每59秒切片一次,因为目前百度智能云的语音识别最长支持单语音文件是60秒
piece_len = 59
result_name = cur_file_folder + cur_file_name_withoutext + ".txt"
if os.path.exists(result_name):
os.remove(result_name)
with open(result_name, "a", encoding="utf-8") as f:
for i in range(pieces):
# 转码成pcm格式的文件,转换成百度智能云的语音识别能识别的pcm格式编码率等
# 目前百度智能云语音识别要求的pcm格式的参数为:原始 PCM 的录音参数必须符合 16k 采样率、16bit 位深、单声道
pcm_filename = cur_file_folder + cur_file_name_withoutext + ("_" + str(i + 1) if i > 0 else "") + ".pcm"
cmd = 'ffmpeg -y -ss %s -t %s -i "%s" -avoid_negative_ts 1 -acodec pcm_s16le -f s16le -ac 1 -ar 16000 "%s"' \
% (i * 59, piece_len, cur_file, pcm_filename)
# print(cmd)
sub = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = sub.communicate()
sub.kill()
del sub
# 进行云端识别调用百度api
print("云端识别[" + pcm_filename + "]这个文件中...")
res = client.asr(get_file_content(pcm_filename), 'pcm', 16000)
if res.get("err_no") == 0:
result = res.get("result")
for cur_line in result:
print("[识别结果]", cur_line + "\n")
f.write(cur_line + "\n")
else:
print(pcm_filename, "识别报错!")
print(res)
# 检查机制,避免每秒请求数过大
QPS_count += 1
if QPS_count >= QPS:
QPS_end_time = time.time()
if QPS_end_time - QPS_start_time < 1:
print("休息一秒,QPS(每秒请求数)有限制,目前设定为:" + str(QPS) + ",避免频繁调用报错..." + "\n")
# 实际休息1.1秒,更保险
time.sleep(1.1)
# 重新新的一轮统计
QPS_start_time = time.time()
QPS_count = 0
print("[" + result_name, "]文件写入结束!" + "\n")
finished_count += 1
print("*" * 40)
print("当前已完成进度为:" + get_process_sign(finished_count, file_len) + get_process(finished_count, file_len))
print("*" * 40 + "\n")
# 处理PCM中间文件
if DELETE_PCM:
print("正在删除pcm中间文件...")
filenames = get_filelist(project_path, ext=["pcm"])
for file in filenames:
os.remove(file)
print("删除完毕!" + "\n")
end_time = time.time()
print("总共%s个文件全部识别完毕!用时%s秒,感谢您的使用!" % (file_len, round(end_time - start_time, 2)))
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。