(仅供娱乐)基于FFmpeg和PaddleX实现视频分类
代码是一个视频分类程序,它使用PaddleX框架中的深度学习模型对视频截图进行分类推理,并将分类结果应用于视频分类。它的工作流程是:
导入必要的Python库:os、shlex、subprocess、shutil、paddlex、cv2、defaultdict。
设置调试模式:DEBUG_MODE = True,当DEBUG_MODE为True时,程序会将视频文件复制到目标文件夹中,而不是移动文件。
设置截图参数:num_screenshots、screenshot_size、screenshot_interval、quality。
设置视频文件所在文件夹路径、截图保存文件夹路径、分类后的视频保存文件夹路径:video_folder、img_folder、video_new_folder。
设置FFmpeg和FFprobe的路径:ffmpeg_path、ffprobe_path。
设置PaddleX分类模型路径:model_path。
加载PaddleX分类模型:model = pdx.load_model(model_path)。
控制FFmpeg是否打印信息的参数:show_ffmpeg_info = False。
获取视频文件列表:video_files = [f for f in os.listdir(video_folder) if os.path.isfile(os.path.join(video_folder, f)) and os.path.splitext(f)[1].lower() in ['.mp4', '.avi', '.mkv']]。
设置阈值参数:threshold = 3。
删除已存在的图片保存文件夹:if os.path.exists(img_folder): shutil.rmtree(img_folder)。
创建图片保存文件夹:os.makedirs(img_folder)。
遍历视频文件列表,对每个视频截取多个图片,并进行分类推理。
输出得分最高的分类和分数值。
如果max_score大于阈值参数,则将视频移动到以max_score_category为名的目录中。
import os import argparse import shlex import subprocess import shutil import paddlex as pdx import cv2 from collections import defaultdict def parse_args(): parser = argparse.ArgumentParser(description='Video Classification') parser.add_argument('--num_screenshots', type=int, default=5, help='Number of screenshots to take') parser.add_argument('--screenshot_size', type=str, default='-1:480', help='Size of screenshots to take') parser.add_argument('--screenshot_interval', type=str, default='not(mod(n\,100))', help='Interval between screenshots') parser.add_argument('--quality', type=str, default='2', help='Quality of screenshots') parser.add_argument('--video_folder', type=str, default='Downloads/', help='Folder containing videos') parser.add_argument('--img_folder', type=str, default='TestVideo_img', help='Folder to save screenshots') parser.add_argument('--video_new_folder', type=str, default='TestVideo', help='Folder to move videos to') parser.add_argument('--ffmpeg_path', type=str, default='ffmpeg', help='Path to ffmpeg') parser.add_argument('--ffprobe_path', type=str, default='ffprobe', help='Path to ffprobe') parser.add_argument('--model_path', type=str, default='inference_model', help='Path to classification model') parser.add_argument('--show_ffmpeg_info', action='store_true', help='Whether to show ffmpeg information') parser.add_argument('--threshold', type=float, default=3, help='Threshold for video classification') parser.add_argument('--debug_mode', action='store_true', help='Whether to run in debug mode') return parser.parse_args() if __name__ == '__main__': args = parse_args() num_screenshots = args.num_screenshots screenshot_size = args.screenshot_size screenshot_interval = args.screenshot_interval quality = args.quality video_folder = args.video_folder img_folder = args.img_folder video_new_folder = args.video_new_folder ffmpeg_path = args.ffmpeg_path ffprobe_path = args.ffprobe_path model_path = args.model_path show_ffmpeg_info = args.show_ffmpeg_info threshold = args.threshold DEBUG_MODE = args.debug_mode print("Loading model...") model = pdx.load_model(model_path) print("Model loaded.") video_files = [f for f in os.listdir(video_folder) if os.path.isfile(os.path.join(video_folder, f)) and os.path.splitext(f)[1].lower() in ['.mp4', '.avi', '.mkv']] if os.path.exists(img_folder): shutil.rmtree(img_folder) os.makedirs(img_folder) for video_file in video_files: try: video_path = os.path.join(video_folder, video_file) video_name = os.path.splitext(video_file)[0] img_subfolder = os.path.join(img_folder, video_name) os.makedirs(img_subfolder) cmd = shlex.split(f"{ffprobe_path} -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 {shlex.quote(video_path)}") duration = float(subprocess.check_output(cmd).decode('utf-8').strip()) interval = duration / (num_screenshots + 3) for i, time in enumerate([1, 5, 10]): img_name = f"{video_name}_{i+1}.jpg" img_path = os.path.join(img_subfolder, img_name) cmd = shlex.split(f"{ffmpeg_path} -ss {time} -i {shlex.quote(video_path)} -vframes 1 -s {screenshot_size} -q:v {quality} {shlex.quote(img_path)} -y") subprocess.run(cmd, check=True, shell=False, stdout=subprocess.PIPE if not show_ffmpeg_info else None, stderr=subprocess.PIPE if not show_ffmpeg_info else None) score_dict = defaultdict(float) for i in range(1, num_screenshots+1): img_name = f"{video_name}_{i+3}.jpg" img_path = os.path.join(img_subfolder, img_name) cmd = shlex.split(f"{ffmpeg_path} -ss {interval*(i-1)} -i {shlex.quote(video_path)} -vframes 1 -s {screenshot_size} -q:v {quality} {shlex.quote(img_path)} -y") subprocess.run(cmd, check=True, shell=False, stdout=subprocess.PIPE if not show_ffmpeg_info else None, stderr=subprocess.PIPE if not show_ffmpeg_info else None) im = cv2.imread(img_path) im = im.astype('float32') result = model.predict(im) for item in result: category = item['category'] score = item['score'] score_dict[category] += score max_score_category = max(score_dict, key=score_dict.get) max_score = score_dict[max_score_category] if max_score > threshold: target_folder = os.path.join(video_new_folder, max_score_category) if not os.path.exists(target_folder): os.makedirs(target_folder) if DEBUG_MODE: shutil.copy(video_path, os.path.join(target_folder, video_file)) else: shutil.move(video_path, os.path.join(target_folder, video_file)) print(f"Video: {video_file}, Max Score Category: {max_score_category}, Max Score: {max_score}, moved to {target_folder}.") else: print(f"Video: {video_file}, Max Score Category: {max_score_category}, Max Score: {max_score}.") except Exception as e: print(f"Error processing {video_file}: {e}")