123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- import logging
- import os
- import subprocess
- from celery import shared_task
- from Ansjer.config import LOGGER
- import django
- # 设置 Django 的环境变量
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'Ansjer.cn_config.test_settings') # 确保替换为实际的 settings 模块路径
- django.setup()
- def start_xvfb():
- try:
- # 检查 Xvfb 是否已经在运行
- output = subprocess.check_output("pgrep -f Xvfb", shell=True)
- if output:
- LOGGER.info('虚拟显示服务已在运行')
- os.environ['DISPLAY'] = ':99'
- else:
- # 启动虚拟显示服务
- process = subprocess.Popen(['Xvfb', ':99', '-screen', '0', '1024x768x16'])
- LOGGER.info('虚拟显示服务启动成功,进程ID: %s', process.pid)
- os.environ['DISPLAY'] = ':99'
- except subprocess.CalledProcessError:
- # 如果没有运行,启动 Xvfb
- process = subprocess.Popen(['Xvfb', ':99', '-screen', '0', '1024x768x16'])
- LOGGER.info('虚拟显示服务启动成功,进程ID: %s', process.pid)
- os.environ['DISPLAY'] = ':99'
- except Exception as e:
- LOGGER.error('启动虚拟显示服务失败: %s', e)
- @shared_task
- def generate_video(image_files, output_path):
- LOGGER.info('start开始视频生成任务')
- # 启动虚拟显示服务
- start_xvfb()
- try:
- video_files = []
- music_path = "/web/test/ASJServer/static/temp/music.mp3"
- # 定义特效列表
- transitions = [
- "BowTieHorizontal.glsl",
- "burn.glsl",
- "cube.glsl",
- "pinwheel.glsl",
- "windowslice.glsl",
- "Radial.glsl",
- "rotateTransition.glsl",
- "wind.glsl",
- "squeeze.glsl"
- ]
- # 生成相邻图片的视频
- for i in range(len(image_files) - 1):
- output_video = os.path.join(output_path, f'{i + 1}_{i + 2}.mp4')
- video_files.append(output_video)
- cmd = [
- 'ffmpeg', '-loop', '1', '-i', image_files[i],
- '-loop', '1', '-i', image_files[i + 1],
- '-filter_complex',
- f'gltransition=duration=1:offset=0.7:source=/web/test/ASJServer/static/gl-transitions/{transitions[i]}',
- '-t', '2', output_video
- ]
- subprocess.run(cmd, check=True)
- # 生成拼接列表文件
- with open(os.path.join(output_path, 'filelist.txt'), 'w') as f:
- for video in video_files:
- f.write(f"file '{video}'\n")
- # 拼接视频
- final_output = os.path.join(output_path, 'final_output.mp4')
- subprocess.run(
- ['ffmpeg', '-f', 'concat', '-safe', '0', '-i', '/web/test/ASJServer/static/temp/filelist.txt', '-c', 'copy',
- final_output],
- check=True)
- # 添加背景音乐
- final_output_with_music = os.path.join(output_path, 'final_output_with_music.mp4')
- subprocess.run(['ffmpeg', '-i', final_output, '-i', music_path, '-c:v', 'copy', '-c:a', 'aac', '-b:a', '192k',
- '-shortest', final_output_with_music], check=True)
- # 清理临时文件
- for video in video_files:
- os.remove(video)
- os.remove('/web/test/ASJServer/static/temp/filelist.txt')
- except Exception as e:
- LOGGER.info(f'视频合成失败: {e}')
|