代码拉取完成,页面将自动刷新
from sqlalchemy import create_engine, Column, Integer, String, Boolean, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import uuid
# 定义基类
Base = declarative_base()
# 定义 Video 模型
class Video(Base):
__tablename__ = 'video'
video_uuid = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
url = Column(String, nullable=False)
name = Column(String, nullable=True) # 视频名字,暂时不用
status = Column(Integer, nullable=False, default=0)
# 下载状态 | 未下载0;正在下载1;已下载2;下载失败3
clip_status = Column(Integer, nullable=False, default=0)
# 处理状态 | 未处理0;正在处理1;已处理2
error_info = Column(String, nullable=True, default='')
file_path = Column(String, nullable=True, default='')
def __repr__(self):
return f"<Video(id={self.video_uuid}, url={self.url}, status={self.status}, clip_status={self.clip_status})>"
class CropVideo(Base):
__tablename__ = 'crop_video'
crop_video_id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
# 使用 UUID 作为主键,自动生成
video_id = Column(String, ForeignKey('video.video_uuid'), nullable=False)
# 外键,指向 Video 表的 video_uuid 字段,不允许为空
start = Column(Integer, nullable=True, default=0)
end = Column(Integer, nullable=True, default=0)
bbox = Column(String, nullable=True, default='')
width = Column(Integer, nullable=True, default=0)
height = Column(Integer, nullable=True, default=0)
person_class = Column(Integer, nullable=True, default=0)
save_path = Column(String, nullable=True, default='')
def __repr__(self):
return f"<CropVideo(id={self.crop_video_id}, video_id={self.video_id})>"
class SqlHandler:
def __init__(self, db_name="videos"):
# 创建数据库连接和 Session 类型
engine = create_engine('sqlite:///{}.db'.format(db_name))
Session = sessionmaker(bind=engine)
Base.metadata.create_all(engine)
self.session = Session()
def add_crop_video(self, video_uuid, start=0, end=0, bbox='', width=0, height=0, person_class=0, save_path=''):
# 创建一个新的 CropVideo 实例
new_crop_video = CropVideo(video_id=video_uuid, start=start, end=end, bbox=bbox, width=width, height=height,
person_class=person_class, save_path=save_path)
# 添加到会话并提交
self.session.add(new_crop_video)
self.session.commit()
return new_crop_video
def set_video_status(self,video_uuid,key_name,value):
video = self.session.query(Video).filter_by(video_uuid=video_uuid).first()
if video:
setattr(video, key_name, value)
self.session.commit()
def insert_video(self, url):
# 检查数据库中是否已经存在相同的url
existing_video = self.session.query(Video).filter_by(url=url).first()
if existing_video is None:
# 如果不存在,创建新视频并插入数据库
new_video = Video(url=url)
self.session.add(new_video)
self.session.commit()
return new_video.video_uuid
else:
# 如果存在,返回已存在视频的id
return existing_video.video_uuid
def set_video_download(self, video_uuid):
'''
将状态设置为 正在下载
:param video_uuid:
:return:
'''
video = self.session.query(Video).filter_by(video_uuid=video_uuid).first()
if video:
video.status = 1
self.session.commit()
def set_video_download_error(self, video_uuid):
'''
将状态设置为 下载失败
:param video_uuid:
:return:
'''
video = self.session.query(Video).filter_by(video_uuid=video_uuid).first()
if video:
video.status = 3
self.session.commit()
def set_video_downloaded_file(self, video_uuid):
'''
将状态设置为 已下载
:param video_uuid:
:return:
'''
video = self.session.query(Video).filter_by(video_uuid=video_uuid).first()
if video:
video.status = 2
self.session.commit()
def get_video_url_list(self, number):
return [video.url for video in self.session.query(Video).filter_by(status=0).limit(number)]
def get_video_uuid_list(self, number):
return [video.video_uuid for video in self.session.query(Video).filter_by(status=0).limit(number)]
def get_video_uuid_and_url_list(self, number):
return [(video.video_uuid, video.url) for video in self.session.query(Video).filter_by(status=0).limit(number)]
def update_video(self, id, url=None, name=None):
video = self.session.query(Video).filter_by(video_uuid=id).first()
if video:
if url:
video.url = url
if name:
video.name = name
self.session.commit()
def delete_video(self, id):
video = self.session.query(Video).filter_by(video_uuid=id).first()
if video:
self.session.delete(video)
self.session.commit()
def get_video_by_id(self, id):
return self.session.query(Video).filter_by(video_uuid=id).first()
def get_all_videos(self):
return self.session.query(Video).all()
def get_total_videos_count(self):
return self.session.query(Video).count()
def get_undownloaded_videos_count(self):
return self.session.query(Video).filter_by(status=0).count()
def __del__(self):
self.session.close()
# 使用示例
if __name__ == "__main__":
handler = SqlHandler()
# 插入数据
video_id = handler.insert_video("http://example.com/video6000.mp4")
video_id1 = handler.insert_video("http://example.com/video2.mp4")
video_id2 = handler.insert_video("http://example.com/video777.mp4")
# print(f"Inserted video with ID: {video_id}")
# 更新数据
# handler.update_video(video_id, url="http://example.com/new_videonew.mp4")
# 查询单条数据
video = handler.get_video_by_id(video_id)
# print(f"Video: {video}")
# 查询所有数据
videos = handler.get_all_videos()
print(f"All videos: {videos}")
undownloaded_videos = handler.get_undownloaded_videos_count()
print(f"All undownloaded videos: {undownloaded_videos}")
# 删除数据
handler.delete_video(video_id)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。