(仅供娱乐)基于FFmpeg和PaddleX实现视频分类
代码是一个视频分类程序,它使用PaddleX框架中的深度学习模型对视频截图进行分类推理,并将分类结果应用于视频分类。它的工作流程是:
导入必要的Python库:os、shlex、subprocess、shutil、paddlex、cv2、defaultdict。
设置调试模式:DEBUG_MODE = True,当DEBUG_MODE为True时,程序会将视频文件复制到目标文件夹中,而不是移动文件。
设置截图参数:num_screenshots、screenshot_size、screenshot_interval、quality。
设置视频文件所在文件夹路径、截图保存文件夹路径、分类后的视频保存文件夹路径:video_folder、img_folder、video_new_folder。
设置FFmpeg和FFprobe的路径:ffmpeg_path、ffprobe_path。
设置PaddleX分类模型路径:model_path。
加载PaddleX分类模型:model = pdx.load_model(model_path)。
控制FFmpeg是否打印信息的参数:show_ffmpeg_info = False。
获取视频文件列表:video_files = [f for f in os.listdir(video_folder) if os.path.isfile(os.path.join(video_folder, f)) and os.path.splitext(f)[1].lower() in ['.mp4', '.avi', '.mkv']]。
设置阈值参数:threshold = 3。
删除已存在的图片保存文件夹:if os.path.exists(img_folder): shutil.rmtree(img_folder)。
创建图片保存文件夹:os.makedirs(img_folder)。
遍历视频文件列表,对每个视频截取多个图片,并进行分类推理。
输出得分最高的分类和分数值。
如果max_score大于阈值参数,则将视频移动到以max_score_category为名的目录中。
import os
import argparse
import shlex
import subprocess
import shutil
import paddlex as pdx
import cv2
from collections import defaultdict
def parse_args():
parser = argparse.ArgumentParser(description='Video Classification')
parser.add_argument('--num_screenshots', type=int, default=5, help='Number of screenshots to take')
parser.add_argument('--screenshot_size', type=str, default='-1:480', help='Size of screenshots to take')
parser.add_argument('--screenshot_interval', type=str, default='not(mod(n\,100))', help='Interval between screenshots')
parser.add_argument('--quality', type=str, default='2', help='Quality of screenshots')
parser.add_argument('--video_folder', type=str, default='Downloads/', help='Folder containing videos')
parser.add_argument('--img_folder', type=str, default='TestVideo_img', help='Folder to save screenshots')
parser.add_argument('--video_new_folder', type=str, default='TestVideo', help='Folder to move videos to')
parser.add_argument('--ffmpeg_path', type=str, default='ffmpeg', help='Path to ffmpeg')
parser.add_argument('--ffprobe_path', type=str, default='ffprobe', help='Path to ffprobe')
parser.add_argument('--model_path', type=str, default='inference_model', help='Path to classification model')
parser.add_argument('--show_ffmpeg_info', action='store_true', help='Whether to show ffmpeg information')
parser.add_argument('--threshold', type=float, default=3, help='Threshold for video classification')
parser.add_argument('--debug_mode', action='store_true', help='Whether to run in debug mode')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
num_screenshots = args.num_screenshots
screenshot_size = args.screenshot_size
screenshot_interval = args.screenshot_interval
quality = args.quality
video_folder = args.video_folder
img_folder = args.img_folder
video_new_folder = args.video_new_folder
ffmpeg_path = args.ffmpeg_path
ffprobe_path = args.ffprobe_path
model_path = args.model_path
show_ffmpeg_info = args.show_ffmpeg_info
threshold = args.threshold
DEBUG_MODE = args.debug_mode
print("Loading model...")
model = pdx.load_model(model_path)
print("Model loaded.")
video_files = [f for f in os.listdir(video_folder) if os.path.isfile(os.path.join(video_folder, f)) and os.path.splitext(f)[1].lower() in ['.mp4', '.avi', '.mkv']]
if os.path.exists(img_folder):
shutil.rmtree(img_folder)
os.makedirs(img_folder)
for video_file in video_files:
try:
video_path = os.path.join(video_folder, video_file)
video_name = os.path.splitext(video_file)[0]
img_subfolder = os.path.join(img_folder, video_name)
os.makedirs(img_subfolder)
cmd = shlex.split(f"{ffprobe_path} -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {shlex.quote(video_path)}")
duration = float(subprocess.check_output(cmd).decode('utf-8').strip())
interval = duration / (num_screenshots + 3)
for i, time in enumerate([1, 5, 10]):
img_name = f"{video_name}_{i+1}.jpg"
img_path = os.path.join(img_subfolder, img_name)
cmd = shlex.split(f"{ffmpeg_path} -ss {time} -i {shlex.quote(video_path)} -vframes 1 -s {screenshot_size} -q:v {quality} {shlex.quote(img_path)} -y")
subprocess.run(cmd, check=True, shell=False, stdout=subprocess.PIPE if not show_ffmpeg_info else None, stderr=subprocess.PIPE if not show_ffmpeg_info else None)
score_dict = defaultdict(float)
for i in range(1, num_screenshots+1):
img_name = f"{video_name}_{i+3}.jpg"
img_path = os.path.join(img_subfolder, img_name)
cmd = shlex.split(f"{ffmpeg_path} -ss {interval*(i-1)} -i {shlex.quote(video_path)} -vframes 1 -s {screenshot_size} -q:v {quality} {shlex.quote(img_path)} -y")
subprocess.run(cmd, check=True, shell=False, stdout=subprocess.PIPE if not show_ffmpeg_info else None, stderr=subprocess.PIPE if not show_ffmpeg_info else None)
im = cv2.imread(img_path)
im = im.astype('float32')
result = model.predict(im)
for item in result:
category = item['category']
score = item['score']
score_dict[category] += score
max_score_category = max(score_dict, key=score_dict.get)
max_score = score_dict[max_score_category]
if max_score > threshold:
target_folder = os.path.join(video_new_folder, max_score_category)
if not os.path.exists(target_folder):
os.makedirs(target_folder)
if DEBUG_MODE:
shutil.copy(video_path, os.path.join(target_folder, video_file))
else:
shutil.move(video_path, os.path.join(target_folder, video_file))
print(f"Video: {video_file}, Max Score Category: {max_score_category}, Max Score: {max_score}, moved to {target_folder}.")
else:
print(f"Video: {video_file}, Max Score Category: {max_score_category}, Max Score: {max_score}.")
except Exception as e:
print(f"Error processing {video_file}: {e}")


