代码拉取完成,页面将自动刷新
同步操作将从 Oops404/哔哩BCM 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
# -*- coding: utf-8 -*-
# author: cheneyjin@outlook.com
# update 20221002
import traceback
import _thread
import json
import logging
import os
import re
import sys
import uuid
import time
from subprocess import Popen, PIPE
from PyQt5 import QtCore, QtWidgets, QtGui
from PyQt5.QtGui import QIcon, QTextCursor
from PyQt5.QtWidgets import QMainWindow, QApplication
import cgitb
from numpy import fromfile, uint8
# import qdarkstyle
style_global = """
*{
font-family: "Microsoft YaHei";
}
"""
path_app = os.path.split(os.path.abspath(sys.argv[0]))[0]
TAG_DUPLICATE = "名字重复!"
CACHE_TYPE_ANDROID = 1
CACHE_TYPE_PC = 2
CACHE_TYPE_UWP = 3
CACHE_TYPE_IOS = 4
CACHE_TYPE_OSX = 5
CACHE_TYPE_OTHER = 6
def validate(file_name_str):
r_str = r"[\/\\\:\*\?\"\<\>\|]"
new_title = re.sub(r_str, "_", file_name_str)
return new_title
class Target(object):
def __init__(self, cache_type, bvid, name, video_path, audio_path, root) -> None:
super().__init__()
self.cache_type = cache_type
self.bvid = bvid
self.name = name
self.video_path = video_path
self.audio_path = audio_path
self.root = root
def __str__(self) -> str:
return "name:{}, video_path:{}, audio_path:{}, root:{}".format(
self.bvid,
self.name,
self.video_path,
self.audio_path,
self.root
)
class UiMainWindow(object):
def __init__(self, logger) -> None:
super().__init__()
self.logger = logger
self.task_id = None
self.task_list = None
self.directory = None
self.target_video_name = "video.m4s"
self.target_audio_name = "audio.m4s"
self.target_android_file_info = "entry.json"
self.target_pc_file_info = ".videoInfo"
self.merging = False
self.searching = False
self.font = QtGui.QFont()
self.font.setPointSize(10)
self.translate = QtCore.QCoreApplication.translate
self.path_output = "{}/{}".format(path_app, self.translate("output", "输出目录"))
self.trans_output = "{}/{}".format(path_app, self.translate("trans", "转换目录"))
if not os.path.exists(self.path_output):
os.mkdir(self.path_output)
if not os.path.exists(self.trans_output):
os.mkdir(self.trans_output)
def retranslate_ui(self, main_window):
main_window.setWindowTitle(self.translate(
"bili cache merging tool",
"哔哩BCM-翻滚吧年糕君 ID:1489684 "
))
main_window.setFixedSize(main_window.width(), main_window.height())
main_window.setWindowIcon(QIcon('local/favicon.ico'))
self.text_view.setText(self.translate(
"copy cache folder from your phone local storage path :\n"
"'/Android/data/tv.danmaku.bili/download' into your pc. \n"
"note: do not change anything in cache folder",
"从手机存储路径:'/Android/data/tv.danmaku.bili/download'"
"拷贝缓存'完整文件夹'到电脑中,至任意目录内。"
))
self.text_view.append("\r")
self.text_view.append(self.translate(
"if application gets some exception, please email me at 'cheneyjin@outlook.com' "
"and attach the file named 'log.md' that in application folder.",
"如果运行异常,请邮件至'cheneyjin@outlook.com',"
"邮件中请上传程序目录下的log.md文件, 内容能图文并茂就更好了~"
))
self.text_view.append("<a href='https://gitee.com/Oops404/nas-guard/raw/master"
"/%E6%AC%A2%E8%BF%8E%E6%94%AF%E6%8C%81.jpg'>👉欢迎打赏👈o( ̄▽ ̄)ブ</a>")
self.path_select_btn.setText(self.translate("select cache path", "选择路径"))
self.start_btn.setText(self.translate("start merge", "开始"))
# noinspection PyAttributeOutsideInit,PyUnresolvedReferences
def setup_ui(self, main_window):
main_window.setObjectName("mainWindow")
main_window.resize(600, 320)
main_window.setStyleSheet(style_global)
self.central_widget = QtWidgets.QWidget(main_window)
self.central_widget.setObjectName("central_widget")
self.vertical_LayoutWidget = QtWidgets.QWidget(self.central_widget)
self.vertical_LayoutWidget.setGeometry(QtCore.QRect(0, 0, 600, 309))
self.vertical_LayoutWidget.setObjectName("verticalLayoutWidget")
self.vertical_LayoutWidget.setContentsMargins(1, 1, 1, 1)
self.vertical_layout = QtWidgets.QVBoxLayout(self.vertical_LayoutWidget)
self.vertical_layout.setObjectName("verticalLayout")
self.path_select_btn = QtWidgets.QPushButton(self.vertical_LayoutWidget)
self.path_select_btn.setObjectName("path_select_btn")
self.path_select_btn.setFont(self.font)
self.vertical_layout.addWidget(self.path_select_btn)
self.text_view = QtWidgets.QTextBrowser(self.vertical_LayoutWidget)
self.text_view.setObjectName("text_view")
self.text_view.setFont(self.font)
self.text_view.setOpenExternalLinks(True)
self.vertical_layout.addWidget(self.text_view)
self.start_btn = QtWidgets.QPushButton(self.vertical_LayoutWidget)
self.start_btn.setObjectName("start_btn")
self.start_btn.setContentsMargins(0, 10, 0, 10)
self.start_btn.setFont(self.font)
self.vertical_layout.addWidget(self.start_btn)
main_window.setCentralWidget(self.central_widget)
self.status_bar = QtWidgets.QStatusBar(main_window)
self.status_bar.setObjectName("status_bar")
main_window.setStatusBar(self.status_bar)
self.path_select_btn.clicked.connect(self.select_path)
self.start_btn.clicked.connect(self.start_merge)
self.retranslate_ui(main_window)
QtCore.QMetaObject.connectSlotsByName(main_window)
def start_merge(self):
try:
if self.searching:
self.text_view.append(self.translate(
"searching cache task is not over, do not click again",
"检测缓存数量任务还没结束,别点啦!"
))
return
if self.task_list is None or len(self.task_list) < 1:
self.text_view.append(self.translate(
"no cache file to be searched",
"别点啦,没检测到文件"
))
return
if not self.merging:
self.text_view.setText(self.translate(
"start merging,please wait a minute",
"开始合并,请等待..."
))
self.merging = True
else:
self.text_view.append(self.translate(
"program is working,do not click again",
"正在工作,不要重复点击"
))
return
_thread.start_new_thread(self.task, ("bcm-task1", 2,))
finally:
self.text_view.moveCursor(QTextCursor.End)
def merge_cache(self, _idx, _video_path, _audio_path, _root_path, _bvid, _file_name):
_cmd = "\"{}/local/ffmpeg.exe\" -i \"{}\" -i \"{}\" -c:v copy -c:a copy \"{}/{}.mp4\"".format(
path_app,
_video_path,
_audio_path,
_root_path,
_file_name
)
self.logger.info("[{}]:{}".format(_bvid, _cmd))
with Popen(_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) as p:
p.communicate()
self.text_view.append("内容:{},{}\n进度:{}%".format(
_file_name,
"已合并",
str(round(((_idx + 1) / len(self.task_list)) * 100, 2)))
)
self.text_view.moveCursor(QTextCursor.End)
def task(self, arg1, arg2):
try:
self.task_id = uuid.uuid1()
self.logger.info("\nTASK START {}".format(self.task_id))
for index, f in enumerate(self.task_list):
result_file = "{}/{}.mp4".format(f.root, f.name)
if os.path.exists(result_file):
f.name = TAG_DUPLICATE + f.name + str(time.time())
if CACHE_TYPE_ANDROID == f.cache_type:
self.merge_cache(index, f.video_path, f.audio_path, f.root, f.bvid, f.name)
if CACHE_TYPE_PC == f.cache_type:
video_trans_path = '{}/{}-1.mp4'.format(self.trans_output, uuid.uuid1())
audio_trans_path = '{}/{}-2.mp4'.format(self.trans_output, uuid.uuid1())
self.pc_cache_decode(f.video_path, video_trans_path)
self.pc_cache_decode(f.audio_path, audio_trans_path)
self.merge_cache(index, video_trans_path, audio_trans_path, f.root, f.bvid, f.name)
os.remove(video_trans_path)
os.remove(audio_trans_path)
except Exception as e:
self.logger.error(traceback.format_exc())
self.text_view.append(str(e))
finally:
self.merging = False
self.task_list = list()
self.logger.info("TASK FINISHED {}".format(self.task_id))
self.text_view.append(self.translate(
"finished. check the output path in app dir.",
"运行结束,查看本应用路径下的\"输出目录\"吧。"
))
def select_path(self):
self.task_list = list()
self.directory = QtWidgets.QFileDialog.getExistingDirectory(
None,
self.translate("select cache path", "选取文件夹"),
"D:/"
)
self.text_view.setText("{}: {}".format(self.translate("path", "路径"), self.directory))
path_str = str(self.directory)
if "" == path_str:
self.text_view.append(self.translate("no path selected", "没选择任何路径。"))
else:
try:
self.searching = True
self.search_path(path_str)
self.text_view.append("{}: {}{}".format(
self.translate("find", "识别到"),
len(self.task_list),
self.translate("cache can merged", "个可合并视频。"))
)
finally:
self.searching = False
def search_path(self, root_path):
for _l in os.listdir(root_path):
path = os.path.join(root_path, _l)
if os.path.isdir(path):
self.search_path(path)
else:
if path.__contains__(self.target_video_name):
self.android_load_file(path)
if path.__contains__(self.target_pc_file_info):
self.pc_load_file(path)
def android_parse_video_name(self, file_info):
_name = ""
if "ep" in file_info.keys():
_name = "({}){}".format(file_info["ep"]["index"], file_info["ep"]["index_title"])
elif "page_data" in file_info.keys():
if "download_subtitle" in (file_info["page_data"]).keys():
_name = file_info["page_data"]["download_subtitle"]
if "page" in (file_info["page_data"]).keys():
_page_num = file_info["page_data"]["page"]
_name = "{}_P{}".format(_name, _page_num)
else:
_name = file_info["title"]
if len(_name) < 1:
_name = file_info["title"]
if len(_name) < 1:
_name = "无法解析名称" + str(time.time())
return _name
def android_parse_bvid(self, file_info):
_bvid = ""
if "bvid" in file_info.keys():
_bvid = file_info["bvid"]
elif "ep" in file_info.keys() and "bvid" in (file_info["ep"]).keys():
_bvid = file_info["ep"]["bvid"]
if len(_bvid) < 1:
_bvid = "无法解析bvid" + str(time.time())
return _bvid
def android_detect_loss(self, parent_path):
video_path = "{}/{}".format(parent_path, self.target_video_name)
audio_path = "{}/{}".format(parent_path, self.target_audio_name)
video_exist = os.path.exists(video_path)
audio_exist = os.path.exists(audio_path)
if video_exist is False or audio_exist is False:
self.logger.error("FILE PATH {}:{}".format(video_exist, video_path))
self.logger.error("FILE PATH {}:{}".format(audio_exist, audio_path))
self.text_view.append("{}{}".format(
self.translate(
"missing file detected in cache path: ",
"检测到缓存存在缺失,路径:"
),
parent_path
))
return False, video_path, audio_path
return True, video_path, audio_path
def android_load_file(self, path):
parent_path = os.path.abspath(os.path.join(path, ".."))
file_info_path = "{}/../{}".format(parent_path, self.target_android_file_info)
name = "default"
bvid = "default"
# noinspection PyBroadException
try:
with open(file_info_path, "r", encoding="utf-8") as f:
file_info = json.load(f)
name = self.android_parse_video_name(file_info)
name = validate(name) if name is not None else "临时名称{}".format(uuid.uuid1())
bvid = self.android_parse_bvid(file_info)
except Exception as e:
self.logger.error(str(e))
self.logger.error(traceback.format_exc())
cache_intact, video_path, audio_path = self.android_detect_loss(parent_path)
if not cache_intact:
return
if os.path.exists(self.path_output):
parent_path = self.path_output
self.task_list.append(
Target(CACHE_TYPE_ANDROID, bvid, name, video_path, audio_path, parent_path)
)
def pc_parse_video_name(self, file_info):
_name = ''
if 'title' in file_info.keys():
_name = file_info['title']
if 'uname' in file_info.keys():
_name = _name + '_aut{}'.format(str(file_info['uname']))
if 'p' in file_info.keys():
_name = _name + '_P{}'.format(str(file_info['p']))
if len(_name) < 1:
_name = "无法解析名称" + str(time.time())
return _name
def pc_parse_bvid(self, file_info):
_bvid = ''
if 'bvid' in file_info.keys():
_bvid = file_info['bvid']
if len(_bvid) < 1:
_bvid = "无法解析bvid" + str(time.time())
return _bvid
def pc_detect_loss(self, _parent_path, _cid):
cache_file = []
video_path = ''
audio_path = ''
for _l in os.listdir(_parent_path):
path = os.path.join(_parent_path, _l)
if os.path.isdir(path):
continue
if path.__contains__(_cid) and path.__contains__('.m4s'):
cache_file.append(path)
if len(cache_file) != 2:
self.logger.error('pc cache elems error :{},{}'.format(cache_file, str(cache_file)))
return False, video_path, audio_path
if os.stat(cache_file[0]).st_size > os.stat(cache_file[1]).st_size:
return True, cache_file[0], cache_file[1]
return True, cache_file[1], cache_file[0]
def pc_parse_cid(self, file_info):
if 'cid' in file_info.keys():
return file_info['cid']
if 'itemId' in file_info.keys():
return file_info['itemId']
def pc_load_file(self, path):
parent_path = os.path.abspath(os.path.join(path, ".."))
file_info_path = "{}/{}".format(parent_path, self.target_pc_file_info)
name = 'default'
bvid = 'default'
cid = 'default'
try:
with open(file_info_path, 'r', encoding='utf-8') as f:
file_info = json.load(f)
name = self.pc_parse_video_name(file_info)
name = validate(name) if name is not None else "临时名称{}".format(uuid.uuid1())
bvid = self.pc_parse_bvid(file_info)
cid = str(self.pc_parse_cid(file_info))
except Exception as e:
self.logger.error(str(e))
self.logger.error(traceback.format_exc())
cache_intact, video_path, audio_path = self.pc_detect_loss(parent_path, cid)
if not cache_intact:
return
if os.path.exists(self.path_output):
parent_path = self.path_output
self.task_list.append(
Target(CACHE_TYPE_PC, bvid, name, video_path, audio_path, parent_path)
)
def pc_cache_decode(self, _path_input, _path_output):
read = fromfile(_path_input, dtype=uint8)
if all(read[0:9] == [48, 48, 48, 48, 48, 48, 48, 48, 48]):
read[9:].tofile(_path_output)
elif all(read[0:3] == [255, 255, 255]):
read[3:].tofile(_path_output)
else:
self.logger.error('maybe has new change: {}'.format(str(read[0:64])))
#
# def blv_cache_transform(self):
# pass
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。