加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
database_utils.py 21.50 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
import sqlite3
from datetime import datetime
import os
import time
import logging
DB_PATH = './db/data.db'
MAX_RETRIES = 15 # 最大重试次数
logger = logging.getLogger(__name__)
def with_retry(func):
def wrapper(*args, **kwargs):
retry_count = 0
while retry_count < MAX_RETRIES:
try:
return func(*args, **kwargs)
except sqlite3.OperationalError as e:
retry_count += 1
logger.error(f"Database operation failed: {e}. Retry {retry_count}/{MAX_RETRIES}.")
time.sleep(2)
raise Exception(f"Operation failed after {MAX_RETRIES} retries.")
return wrapper
# 初始化数据库
@with_retry
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS progress
(id INTEGER PRIMARY KEY AUTOINCREMENT,
sentence TEXT,
idx INTEGER,
total INTEGER,
status TEXT,
timestamp TEXT,
ogg_files_count INTEGER)''')
# 创建 split_sentences 表,增加 name 和 processing_time 字段
cursor.execute('''CREATE TABLE IF NOT EXISTS split_sentences
(id INTEGER PRIMARY KEY AUTOINCREMENT,
sentence TEXT,
part_id INTEGER,
total_parts INTEGER,
status TEXT,
is_chapter INTEGER DEFAULT 0,
audio_filename TEXT,
original_order INTEGER,
chapter TEXT,
section INTEGER,
audio_duration INTEGER DEFAULT 0,
name TEXT,
processing_time INTEGER)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS submission_info
(id INTEGER PRIMARY KEY DEFAULT 1 CHECK(id = 1),
filename TEXT,
first_slice TEXT)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS detailed_info
(id INTEGER PRIMARY KEY AUTOINCREMENT,
message TEXT,
timestamp TEXT)''') # 新增记录详细错误信息的表
# 创建 speaker 表
cursor.execute('''CREATE TABLE IF NOT EXISTS speaker
(id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
sort_order INTEGER,
is_default INTEGER DEFAULT 0)''')
cursor.execute('''INSERT OR IGNORE INTO speaker (id, name, sort_order, is_default)
SELECT 1, '叶内法', 1, 1
WHERE NOT EXISTS (SELECT 1 FROM speaker)''')
# 创建 `gaojiset` 表(如果表不存在)
cursor.execute('''CREATE TABLE IF NOT EXISTS `gaojiset`
(id INTEGER PRIMARY KEY DEFAULT 1 CHECK(id = 1),
cus_set TEXT,
defaut TEXT,
liushi INTEGER DEFAULT 0)''')
# 插入默认行到 `gaojiset` 表(仅在表为空时插入)
cursor.execute('''INSERT OR IGNORE INTO `gaojiset` (id, defaut, liushi) VALUES (1, 'http://127.0.0.1:9880', 0)''')
# 创建 merge_status 表
cursor.execute('''CREATE TABLE IF NOT EXISTS merge_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id INTEGER DEFAULT 1,
start_time TEXT,
end_time TEXT,
status TEXT CHECK(status IN ('pending', 'in_progress', 'completed', 'failed')),
progress REAL DEFAULT 0.0,
total_duration INTEGER DEFAULT 0,
current_duration INTEGER DEFAULT 0,
error_message TEXT,
output_files TEXT,
current_file TEXT,
filenames TEXT
)''')
conn.commit()
conn.close()
@with_retry
def clear_database(clear_filename=False):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('DELETE FROM progress')
cursor.execute('DELETE FROM split_sentences')
cursor.execute('DELETE FROM detailed_info') # 清空 detailed_info 表格
if clear_filename:
# 用空字符串重置 filename 和 first_slice 字段,而不是删除记录
cursor.execute('UPDATE submission_info SET filename="", first_slice="" WHERE id=1')
conn.commit()
conn.close()
@with_retry
def shrink_database():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('VACUUM') # 收缩数据库
conn.commit()
conn.close()
def get_valid_speakers():
"""获取数据库中所有有效的发音人名称"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT name FROM speaker')
valid_speakers = set(row[0] for row in cursor.fetchall())
conn.close()
return valid_speakers
def get_default_speaker():
"""
获取数据库中设置为默认的发音人。如果没有找到默认发音人,则返回一个预设的发音人名字。
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT name FROM speaker WHERE is_default=1 LIMIT 1')
result = cursor.fetchone()
conn.close()
if result:
return result[0] # 返回默认发音人的名字
else:
return "叶内法" # 如果没有默认发音人,则返回预设的发音人名字
def get_all_speakers():
"""
获取数据库中所有的讲述人,并标记默认讲述人。
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT name, is_default FROM speaker ORDER BY sort_order ASC')
speakers = [{"name": row[0], "is_default": row[1]} for row in cursor.fetchall()]
conn.close()
return speakers
def update_default_speaker(speaker_name):
"""
更新数据库中的默认讲述人。
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 清除所有讲述人的默认标记
cursor.execute('UPDATE speaker SET is_default=0')
# 设置指定讲述人为默认讲述人
cursor.execute('UPDATE speaker SET is_default=1 WHERE name=?', (speaker_name,))
conn.commit()
conn.close()
def add_speaker_to_db(speaker_name):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 检查讲述人是否已经存在
cursor.execute('SELECT COUNT(*) FROM speaker WHERE name=?', (speaker_name,))
if cursor.fetchone()[0] == 0:
# 计算sort_order最大值 + 1作为新讲述人的排序顺序
cursor.execute('SELECT MAX(sort_order) FROM speaker')
max_sort_order = cursor.fetchone()[0] or 0
cursor.execute('INSERT INTO speaker (name, sort_order, is_default) VALUES (?, ?, 0)',
(speaker_name, max_sort_order + 1))
conn.commit()
conn.close()
def delete_speaker_from_db(speaker_name):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 确保不能删除默认讲述人
cursor.execute('SELECT is_default FROM speaker WHERE name=?', (speaker_name,))
is_default = cursor.fetchone()
if is_default and is_default[0] == 1:
raise ValueError("Cannot delete the default speaker.")
# 删除讲述人
cursor.execute('DELETE FROM speaker WHERE name=?', (speaker_name,))
conn.commit()
conn.close()
@with_retry
# 在提交处理前记录 submission_info 的内容
def record_submission_info(filename, first_slice):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 确保 filename 和 first_slice 是字符串,避免 NoneType 错误
if filename is None:
filename = ""
if first_slice is None or not isinstance(first_slice, str):
first_slice = str(first_slice)
cursor.execute('DELETE FROM submission_info') # 每次提交前清空之前的记录
cursor.execute('''INSERT OR REPLACE INTO submission_info (id, filename, first_slice) VALUES (1, ?, ?)''',
(filename, first_slice))
conn.commit()
conn.close()
# 获取 submission_info 表格的内容
def get_submission_info():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT filename, first_slice FROM submission_info WHERE id = 1')
row = cursor.fetchone()
conn.close()
return row if row else (None, None)
# 查询未完成进度
def has_unfinished_sentences():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''SELECT COUNT(*) FROM split_sentences WHERE status="unfinished"''')
count = cursor.fetchone()[0]
conn.close()
return count > 0
# 获取未完成的进度
def get_unfinished_progress():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''SELECT * FROM progress WHERE status="unfinished" ORDER BY id DESC LIMIT 1''')
row = cursor.fetchone()
conn.close()
return row
# 获取中断点的句子
def get_unfinished_sentence():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''SELECT sentence FROM split_sentences WHERE status="unfinished" ORDER BY part_id ASC LIMIT 1''')
row = cursor.fetchone()
conn.close()
return row[0] if row else ""
# 统计章节和句子数量的函数
def get_progress_summary():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''SELECT COUNT(*) FROM split_sentences WHERE is_chapter = 1''')
chapter_count = cursor.fetchone()[0]
cursor.execute('''SELECT COUNT(*) FROM split_sentences WHERE status = "completed"''')
completed_count = cursor.fetchone()[0]
cursor.execute('''SELECT COUNT(*) FROM split_sentences''')
total_count = cursor.fetchone()[0]
conn.close()
remaining_count = total_count - completed_count
return chapter_count, total_count, completed_count, remaining_count
@with_retry
def save_unfinished_progress(idx, total, status):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
ogg_files_count = len([f for f in os.listdir('./temp') if f.endswith('.ogg')])
cursor.execute('''UPDATE progress SET idx=?, total=?, status=?, timestamp=?, ogg_files_count=? WHERE status="unfinished"''',
(idx, total, status, datetime.now(), ogg_files_count))
conn.commit()
conn.close()
@with_retry
def clear_unfinished_progress():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''DELETE FROM progress WHERE status="unfinished"''')
cursor.execute('DELETE FROM split_sentences WHERE status="unfinished"''')
conn.commit()
conn.close()
@with_retry
def save_sentences_to_db(sentences):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
total_parts = len(sentences)
for idx, (sentence, is_chapter, chapter, section, name) in enumerate(sentences):
# 过滤掉“第”和“章”字
if chapter:
chapter = chapter.replace('', '').replace('', '')
cursor.execute('''INSERT INTO split_sentences
(sentence, part_id, total_parts, status, is_chapter, original_order, chapter, section, name)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)''',
(sentence, idx, total_parts, "unfinished", is_chapter, idx, chapter, section, name))
conn.commit()
conn.close()
# 从数据库加载未完成的句子
def load_sentences_from_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row # 这一步将允许我们通过列名来访问数据
cursor = conn.cursor()
cursor.execute('''SELECT sentence, part_id, name FROM split_sentences WHERE status="unfinished" ORDER BY part_id''')
rows = cursor.fetchall()
conn.close()
return rows
@with_retry
def update_sentence_status(part_id, status, filename=None):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
if filename:
cursor.execute('''UPDATE split_sentences SET status=?, audio_filename=? WHERE part_id=?''',
(status, filename, part_id))
else:
cursor.execute('''UPDATE split_sentences SET status=? WHERE part_id=?''',
(status, part_id))
conn.commit()
conn.close()
@with_retry
def update_sentence_duration(part_id, duration):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''UPDATE split_sentences SET audio_duration=? WHERE part_id=?''',
(duration, part_id))
conn.commit()
conn.close()
@with_retry
def update_sentence_processing_time(part_id, processing_time):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''UPDATE split_sentences SET processing_time=? WHERE part_id=?''',
(processing_time, part_id))
conn.commit()
conn.close()
# 获取总进度数据
def get_progress_data():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''SELECT COUNT(*) FROM split_sentences WHERE status = "completed"''')
completed_count = cursor.fetchone()[0]
cursor.execute('''SELECT COUNT(*) FROM split_sentences''')
total_count = cursor.fetchone()[0]
conn.close()
remaining_count = total_count - completed_count
return total_count, completed_count, remaining_count
# 获取章节和节的数量
def get_chapter_and_section_counts():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
cursor.execute('SELECT COUNT(DISTINCT chapter) FROM split_sentences WHERE is_chapter = 1')
num_chapters = cursor.fetchone()[0] or 0
cursor.execute('SELECT COUNT(DISTINCT section) FROM split_sentences WHERE section IS NOT NULL')
num_sections = cursor.fetchone()[0] or 0
except Exception as e:
logger.error(f"Error in get_chapter_and_section_counts: {e}")
num_chapters, num_sections = 0, 0
finally:
conn.close()
return num_chapters, num_sections
def update_filename(new_filename):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('UPDATE submission_info SET filename = ? WHERE id = 1', (new_filename,))
conn.commit()
conn.close()
# 获取总音频时长
def get_total_audio_duration():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT SUM(audio_duration) FROM split_sentences WHERE status="completed"')
total_duration = cursor.fetchone()[0] or 0
conn.close()
return total_duration
# 获取平均节的时间长度
def get_avg_section_duration():
# 获取章节和节的数量
num_chapters, num_sections = get_chapter_and_section_counts()
# 获取总音频时长
total_section_duration = get_total_audio_duration()
# 计算平均节时长
if total_section_duration == 0 or num_sections == 0:
avg_section_duration = 0
else:
avg_section_duration = total_section_duration / num_sections
return avg_section_duration
@with_retry
def log_error_to_detailed_info(message):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''INSERT INTO detailed_info (message, timestamp) VALUES (?, ?)''',
(message, datetime.now()))
conn.commit()
conn.close()
# 获取自定义高级设置
def get_custom_settings():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 查询 cus_set 字段内容
cursor.execute("SELECT cus_set, defaut FROM gaojiset LIMIT 1")
result = cursor.fetchone()
conn.close()
if result and result[0]:
return result[0] # 返回 cus_set 的内容
elif result and result[1]:
return result[1] # 如果 cus_set 为空,返回 defaut 的内容
else:
return "" # 如果数据库里没有数据,返回空字符串
# 更新自定义高级设置
def update_custom_settings(new_settings):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 更新 cus_set 字段内容
cursor.execute("UPDATE gaojiset SET cus_set = ? WHERE rowid = 1", (new_settings,))
conn.commit()
conn.close()
# 恢复默认高级设置
def restore_defaults():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 将 defaut 字段的内容复制到 cus_set 字段
cursor.execute("UPDATE gaojiset SET cus_set = defaut WHERE rowid = 1")
conn.commit()
conn.close()
def get_custom_url():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT cus_set, defaut FROM gaojiset WHERE id = 1 LIMIT 1')
row = cursor.fetchone()
conn.close()
if row:
custom_url, default_url = row
if custom_url and custom_url.startswith(('http://', 'https://')):
return custom_url
elif default_url and default_url.startswith(('http://', 'https://')):
return default_url
return 'http://127.0.0.1:9880' # 兜底的默认值
@with_retry
def update_sentence_processing_time(part_id, processing_time):
"""
更新 split_sentences 表中指定句子的处理时间。
:param part_id: 句子的 part_id,表示在 split_sentences 表中的唯一标识。
:param processing_time: 处理该句子的时间,单位为毫秒。
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''UPDATE split_sentences SET processing_time=? WHERE part_id=?''',
(processing_time, part_id))
conn.commit()
conn.close()
def log_error_to_detailed_info(message):
logger.error(message)
# 这里可以将错误记录到数据库或其他地方,具体取决于你的需求
# 例如,插入到数据库中:
conn = sqlite3.connect('./db/data.db')
cursor = conn.cursor()
cursor.execute('''INSERT INTO detailed_info (message, timestamp) VALUES (?, ?)''',
(message, datetime.now()))
conn.commit()
conn.close()
def get_merge_status():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('SELECT * FROM merge_status WHERE task_id=1')
row = cursor.fetchone()
conn.close()
if row:
return {
"id": row[0],
"task_id": row[1],
"start_time": row[2],
"end_time": row[3],
"status": row[4],
"progress": row[5],
"total_duration": row[6],
"current_duration": row[7],
"error_message": row[8],
"output_files": row[9],
"current_file": row[10],
"filenames": row[11]
}
else:
return None
@with_retry
def create_merge_task(total_duration):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''INSERT OR REPLACE INTO merge_status (task_id, start_time, status, total_duration)
VALUES (1, datetime('now'), 'pending', ?)''', (total_duration,))
conn.commit()
conn.close()
@with_retry
def update_merge_status(current_file=None, progress=None, current_duration=None, status=None, append_filename=None, error_message=None, start_time=None, end_time=None, total_duration=None):
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 准备更新的字段和对应的值
fields_to_update = []
values = []
if current_file is not None:
fields_to_update.append("current_file = ?")
values.append(current_file)
if progress is not None:
fields_to_update.append("progress = ?")
values.append(progress)
if current_duration is not None:
fields_to_update.append("current_duration = ?")
values.append(current_duration)
if status is not None:
fields_to_update.append("status = ?")
values.append(status)
if append_filename is not None:
fields_to_update.append("output_files = output_files || ?")
values.append(append_filename)
if error_message is not None:
fields_to_update.append("error_message = ?")
values.append(error_message)
if start_time is not None:
fields_to_update.append("start_time = ?")
values.append(start_time)
if end_time is not None:
fields_to_update.append("end_time = ?")
values.append(end_time)
if total_duration is not None:
fields_to_update.append("total_duration = ?")
values.append(total_duration)
# 生成动态的 SQL 语句
if fields_to_update:
sql = f"UPDATE merge_status SET {', '.join(fields_to_update)} WHERE id = 1"
cursor.execute(sql, values)
conn.commit()
logger.info("成功更新 merge_status 表中的数据")
else:
logger.warning("没有要更新的字段")
except Exception as e:
logger.error(f"更新 merge_status 表时出错: {str(e)}")
finally:
conn.close()
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化