代码拉取完成,页面将自动刷新
from flask import Flask, request, jsonify, render_template, send_file, abort, redirect
import subprocess
import os
import hashlib
import psutil
app = Flask(__name__)
VIDEO_FILES = []
PUSH_LIST = {}
#video_path = "J:\\演示视频"
#FFMPEG_EXE = "d:/ffmpeg4.3.2-Win/ffmpeg"
video_path = "/home/lx/ZLMediaKit/release/linux/Debug/py/files"
FFMPEG_EXE = '/home/lx/ZLMediaKit/release/linux/Debug/ffmpeg'
def calculate_md5(input_string):
md5 = hashlib.md5()
md5.update(input_string.encode('utf-8'))
return md5.hexdigest()
def get_video_files(directory):
video_files = []
valid_extensions = ['.mp4', '.h264', '.h265']
for root, dirs, files in os.walk(directory):
for file in files:
if file.lower().endswith(tuple(valid_extensions)):
video_files.append(os.path.join(root, file))
return video_files
#关闭程序与其子进程
def terminate_process_and_children(pid):
parent = psutil.Process(pid)
for child in parent.children(recursive=True):
child.terminate()
parent.terminate()
parent.wait()
def video_files_info_fflush():
VIDEO_FILES.clear()
video_files = get_video_files(video_path)
for video_file in video_files:
item = {'path': video_file, 'id': calculate_md5(video_file)}
VIDEO_FILES.append(item)
# 第一个API端点
@app.route('/api/zlm_hook/on_stream_not_found', methods=['POST'])
def on_stream_not_found():
try:
# 解析JSON数据
data = request.json
print(request.json)
# 处理请求逻辑,可以根据需要访问data中的字段
media_server_id = data.get('mediaServerId')
app_name = data.get('app')
stream_id = data.get('id')
ip = data.get('ip')
stream = data.get('stream')
port = data.get('port')
except Exception as e:
return jsonify({'code': -1, 'msg': 'arg err'})
inurl = ''
for videoItem in VIDEO_FILES:
if videoItem['id'] == stream:
inurl = videoItem['path']
if len(inurl) == 0:
return jsonify({'code': -1, 'msg': 'file not exists'})
print("需要进行推流: {}/{}".format(app_name, stream))
ffmpeg_command = [FFMPEG_EXE,
'-re',
'-stream_loop',
'-1',
'-i',
inurl,
'-an',
'-vcodec', 'copy', '-f', 'flv',
'rtmp://192.168.0.65:1935/' + app_name + '/' + stream]
# subprocess.run(ffmpeg_command)
ffmpeg_command_str = ' '.join(ffmpeg_command)
# os.system(ffmpeg_command_str)
if stream in PUSH_LIST:
process = PUSH_LIST[stream]
terminate_process_and_children(process.pid)
process.wait()
process = subprocess.Popen(ffmpeg_command_str, shell=True)
# 需要记录pid
PUSH_LIST[stream] = process
print("推流启动完成>{}, pid:{}".format(ffmpeg_command_str, process.pid))
# 处理请求逻辑
response_data = {'code': 0, 'msg': 'success'}
return jsonify(response_data)
# 第二个API端点
@app.route('/api/zlm_hook/on_stream_none_reader', methods=['POST'])
def on_stream_none_reader():
try:
data = request.json
print('无人观看:{}'.format(request.json))
# 处理请求逻辑,可以根据需要访问data中的字段
media_server_id = data.get('mediaServerId')
app_name = data.get('app')
schema = data.get('schema')
stream = data.get('stream')
except Exception as e:
return jsonify({'code': -1, 'msg': 'arg err'})
#这样关闭不成功
if stream in PUSH_LIST:
process = PUSH_LIST[stream]
print(f'关闭{stream}无人观看任务:{process.pid}')
#os.kill(process.pid, 9)
terminate_process_and_children(process.pid)
process.wait()
PUSH_LIST.pop(stream, None)
response_data = {'code': 0, 'msg': 'success'}
return jsonify(response_data)
def capture_frame(video_path, output_image, time_point='00:00:05'):
# 构建ffmpeg命令
ffmpeg_command = [
FFMPEG_EXE,
'-i', video_path, # 输入视频文件路径
'-vframes', '1', # 截取的帧数
'-y',
output_image # 输出图片文件路径
]
# 调用ffmpeg命令
subprocess.run(ffmpeg_command)
@app.route('/api/video_image/<id>')
def api_video_image(id):
# 图片文件路径
print("返回图片:{}".format(id))
id = id.split('.')[0]
inurl = ''
for videoItem in VIDEO_FILES:
if videoItem['id'] == id:
inurl = videoItem['path']
if len(inurl) == 0:
return abort(404)
# 调用ffmpeg截图
if not os.path.exists('cache'):
os.mkdir('cache')
capture_frame(inurl, f"cache/{id}.jpg")
# 返回图片作为响应
return send_file(f"cache/{id}.jpg", mimetype='image/jpeg')
@app.route('/api/video_list')
def api_video_list():
videos = []
for videoInfo in VIDEO_FILES:
item = {'id': videoInfo['id'], 'path': videoInfo['path']}
if videoInfo['id'] in PUSH_LIST:
process = PUSH_LIST[videoInfo['id']]
item['pid'] = process.pid
else:
item['pid'] = ''
videos.append(item)
response_data = {'code': 0, 'msg': 'success', 'data': videos}
return jsonify(response_data)
@app.route('/api/video_flush')
def api_video_flush():
video_files_info_fflush()
return jsonify({'code': 0, 'msg': 'success'})
@app.route('/')
def index():
videos = []
for videoInfo in VIDEO_FILES:
item = {'id': videoInfo['id'], 'path': videoInfo['path']}
if videoInfo['id'] in PUSH_LIST:
process = PUSH_LIST[videoInfo['id']]
item['pid'] = process.pid
else:
item['pid'] = ''
videos.append(item)
args = {
"videos": videos
}
return render_template('index1.html', **args)
# 允许上传的文件类型
ALLOWED_EXTENSIONS = {'mp4', 'h264', 'h265', '264', '265' }
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/upload', methods=['POST'])
def upload():
# 检查是否有文件被上传
if 'file' not in request.files:
return jsonify({'error': 'No file part'})
file = request.files['file']
# 检查文件类型是否允许
if file and allowed_file(file.filename):
# 保存文件到指定目录
file.save(os.path.join(video_path, file.filename))
video_files_info_fflush()
return redirect('/', code=302, Response=None)
if __name__ == '__main__':
video_files_info_fflush()
print(VIDEO_FILES)
# 启动Flask应用
app.run(host='0.0.0.0', port=8099)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。