288 lines
13 KiB
Python
288 lines
13 KiB
Python
# Copyright (C) 2023 by Artem Khrapov (kabachuha)
|
|
# Read LICENSE for usage terms.
|
|
|
|
import time, math
|
|
import subprocess
|
|
import os, shutil
|
|
import cv2
|
|
from modules.shared import state
|
|
from pkg_resources import resource_filename
|
|
import requests
|
|
from mutagen.mp4 import MP4
|
|
|
|
def get_frame_name(path):
|
|
name = os.path.basename(path)
|
|
name = os.path.splitext(name)[0]
|
|
return name
|
|
|
|
def vid2frames(video_path, video_in_frame_path, n=1, overwrite=True, extract_from_frame=0, extract_to_frame=-1, out_img_format='jpg', numeric_files_output = False):
|
|
if (extract_to_frame <= extract_from_frame) and extract_to_frame != -1:
|
|
raise RuntimeError('Error: extract_to_frame can not be higher than extract_from_frame')
|
|
|
|
if n < 1: n = 1 #HACK Gradio interface does not currently allow min/max in gr.Number(...)
|
|
|
|
# check vid path using a function and only enter if we get True
|
|
if is_vid_path_valid(video_path):
|
|
|
|
name = get_frame_name(video_path)
|
|
|
|
vidcap = cv2.VideoCapture(video_path)
|
|
video_fps = vidcap.get(cv2.CAP_PROP_FPS)
|
|
|
|
input_content = []
|
|
if os.path.exists(video_in_frame_path) :
|
|
input_content = os.listdir(video_in_frame_path)
|
|
|
|
# check if existing frame is the same video, if not we need to erase it and repopulate
|
|
if len(input_content) > 0:
|
|
#get the name of the existing frame
|
|
content_name = get_frame_name(input_content[0])
|
|
if not content_name.startswith(name):
|
|
overwrite = True
|
|
|
|
# grab the frame count to check against existing directory len
|
|
frame_count = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
|
# raise error if the user wants to skip more frames than exist
|
|
if n >= frame_count :
|
|
raise RuntimeError('Skipping more frames than input video contains. extract_nth_frames larger than input frames')
|
|
|
|
expected_frame_count = math.ceil(frame_count / n)
|
|
# Check to see if the frame count is matches the number of files in path
|
|
if overwrite or expected_frame_count != len(input_content):
|
|
shutil.rmtree(video_in_frame_path)
|
|
os.makedirs(video_in_frame_path, exist_ok=True) # just deleted the folder so we need to make it again
|
|
input_content = os.listdir(video_in_frame_path)
|
|
|
|
print(f"Trying to extract frames from video with input FPS of {video_fps}. Please wait patiently.")
|
|
if len(input_content) == 0:
|
|
vidcap.set(cv2.CAP_PROP_POS_FRAMES, extract_from_frame) # Set the starting frame
|
|
success,image = vidcap.read()
|
|
count = extract_from_frame
|
|
t=1
|
|
success = True
|
|
while success:
|
|
if state.interrupted:
|
|
return
|
|
if (count <= extract_to_frame or extract_to_frame == -1) and count % n == 0:
|
|
if numeric_files_output == True:
|
|
cv2.imwrite(video_in_frame_path + os.path.sep + f"{t:09}.{out_img_format}" , image) # save frame as file
|
|
else:
|
|
cv2.imwrite(video_in_frame_path + os.path.sep + name + f"{t:09}.{out_img_format}" , image) # save frame as file
|
|
t += 1
|
|
success,image = vidcap.read()
|
|
count += 1
|
|
print(f"Successfully extracted {count} frames from video.")
|
|
else:
|
|
print("Frames already unpacked")
|
|
vidcap.release()
|
|
return video_fps
|
|
|
|
def is_vid_path_valid(video_path):
|
|
# make sure file format is supported!
|
|
file_formats = ["mov", "mpeg", "mp4", "m4v", "avi", "mpg", "webm"]
|
|
extension = video_path.rsplit('.', 1)[-1].lower()
|
|
# vid path is actually a URL, check it
|
|
if video_path.startswith('http://') or video_path.startswith('https://'):
|
|
response = requests.head(video_path, allow_redirects=True)
|
|
if response.status_code == 404:
|
|
raise ConnectionError("Video URL is not valid. Response status code: {}".format(response.status_code))
|
|
elif response.status_code == 302:
|
|
response = requests.head(response.headers['location'], allow_redirects=True)
|
|
if response.status_code != 200:
|
|
raise ConnectionError("Video URL is not valid. Response status code: {}".format(response.status_code))
|
|
if extension not in file_formats:
|
|
raise ValueError("Video file format '{}' not supported. Supported formats are: {}".format(extension, file_formats))
|
|
else:
|
|
if not os.path.exists(video_path):
|
|
raise RuntimeError("Video path does not exist.")
|
|
if extension not in file_formats:
|
|
raise ValueError("Video file format '{}' not supported. Supported formats are: {}".format(extension, file_formats))
|
|
return True
|
|
|
|
|
|
def clean_folder_name(string):
|
|
illegal_chars = "/\\<>:\"|?*.,\" "
|
|
translation_table = str.maketrans(illegal_chars, "_"*len(illegal_chars))
|
|
return string.translate(translation_table)
|
|
|
|
def find_ffmpeg_binary():
|
|
try:
|
|
import google.colab
|
|
return 'ffmpeg'
|
|
except:
|
|
pass
|
|
for package in ['imageio_ffmpeg', 'imageio-ffmpeg']:
|
|
try:
|
|
package_path = resource_filename(package, 'binaries')
|
|
files = [os.path.join(package_path, f) for f in os.listdir(
|
|
package_path) if f.startswith("ffmpeg-")]
|
|
files.sort(key=lambda x: os.path.getmtime(x), reverse=True)
|
|
return files[0] if files else 'ffmpeg'
|
|
except:
|
|
return 'ffmpeg'
|
|
|
|
# Stitch images to a h264 mp4 video using ffmpeg
|
|
def ffmpeg_stitch_video(ffmpeg_location=None, fps=None, outmp4_path=None, stitch_from_frame=0, stitch_to_frame=None, imgs_path=None, add_soundtrack=None, audio_path=None, crf=17, preset='veryslow', metadata=None):
|
|
start_time = time.time()
|
|
|
|
print(f"Got a request to stitch frames to video using FFmpeg.\nFrames:\n{imgs_path}\nTo Video:\n{outmp4_path}")
|
|
msg_to_print = f"Stitching *video*..."
|
|
print(msg_to_print)
|
|
if stitch_to_frame == -1:
|
|
stitch_to_frame = 999999999
|
|
try:
|
|
cmd = [
|
|
ffmpeg_location,
|
|
'-y',
|
|
'-vcodec', 'png',
|
|
'-r', str(float(fps)),
|
|
'-start_number', str(stitch_from_frame),
|
|
'-i', imgs_path,
|
|
'-frames:v', str(stitch_to_frame),
|
|
'-c:v', 'libx264',
|
|
'-vf',
|
|
f'fps={float(fps)}',
|
|
'-pix_fmt', 'yuv420p',
|
|
'-crf', str(crf),
|
|
'-preset', preset,
|
|
'-pattern_type', 'sequence',
|
|
]
|
|
|
|
cmd.append(outmp4_path)
|
|
|
|
process = subprocess.Popen(
|
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
stdout, stderr = process.communicate()
|
|
except FileNotFoundError:
|
|
print("\r" + " " * len(msg_to_print), end="", flush=True)
|
|
print(f"\r{msg_to_print}", flush=True)
|
|
raise FileNotFoundError(
|
|
"FFmpeg not found. Please make sure you have a working ffmpeg path under 'ffmpeg_location' parameter.")
|
|
except Exception as e:
|
|
print("\r" + " " * len(msg_to_print), end="", flush=True)
|
|
print(f"\r{msg_to_print}", flush=True)
|
|
raise Exception(
|
|
f'Error stitching frames to video. Actual runtime error:{e}')
|
|
|
|
if add_soundtrack != 'None':
|
|
audio_add_start_time = time.time()
|
|
try:
|
|
cmd = [
|
|
ffmpeg_location,
|
|
'-i',
|
|
outmp4_path,
|
|
'-i',
|
|
audio_path,
|
|
'-map', '0:v',
|
|
'-map', '1:a',
|
|
'-c:v', 'copy',
|
|
'-shortest',
|
|
]
|
|
|
|
cmd.append(outmp4_path+'.temp.mp4')
|
|
|
|
process = subprocess.Popen(
|
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
stdout, stderr = process.communicate()
|
|
if process.returncode != 0:
|
|
print("\r" + " " * len(msg_to_print), end="", flush=True)
|
|
print(f"\r{msg_to_print}", flush=True)
|
|
raise RuntimeError(stderr)
|
|
os.replace(outmp4_path+'.temp.mp4', outmp4_path)
|
|
print("\r" + " " * len(msg_to_print), end="", flush=True)
|
|
print(f"\r{msg_to_print}", flush=True)
|
|
print(f"\rFFmpeg Video+Audio stitching \033[0;32mdone\033[0m in {time.time() - start_time:.2f} seconds!", flush=True)
|
|
except Exception as e:
|
|
print("\r" + " " * len(msg_to_print), end="", flush=True)
|
|
print(f"\r{msg_to_print}", flush=True)
|
|
print(f'\rError adding audio to video. Actual error: {e}', flush=True)
|
|
print(f"FFMPEG Video (sorry, no audio) stitching \033[33mdone\033[0m in {time.time() - start_time:.2f} seconds!", flush=True)
|
|
else:
|
|
print("\r" + " " * len(msg_to_print), end="", flush=True)
|
|
print(f"\r{msg_to_print}", flush=True)
|
|
|
|
# adding metadata
|
|
if metadata is not None:
|
|
print('Writing metadata')
|
|
video = MP4(outmp4_path)
|
|
video["\xa9cmt"] = metadata
|
|
video.save()
|
|
|
|
print(f"\rVideo stitching \033[0;32mdone\033[0m in {time.time() - start_time:.2f} seconds!", flush=True)
|
|
|
|
# quick-retreive frame count, FPS and H/W dimensions of a video (local or URL-based)
|
|
def get_quick_vid_info(vid_path):
|
|
vidcap = cv2.VideoCapture(vid_path)
|
|
video_fps = vidcap.get(cv2.CAP_PROP_FPS)
|
|
video_frame_count = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
video_width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
video_height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
vidcap.release()
|
|
if video_fps.is_integer():
|
|
video_fps = int(video_fps)
|
|
|
|
return video_fps, video_frame_count, (video_width, video_height)
|
|
|
|
# This function usually gets a filename, and converts it to a legal linux/windows *folder* name
|
|
def clean_folder_name(string):
|
|
illegal_chars = "/\\<>:\"|?*.,\" "
|
|
translation_table = str.maketrans(illegal_chars, "_"*len(illegal_chars))
|
|
return string.translate(translation_table)
|
|
|
|
# used in src/rife/inference_video.py and more, soon
|
|
def duplicate_pngs_from_folder(from_folder, to_folder, img_batch_id, orig_vid_name):
|
|
import cv2
|
|
#TODO: don't copy-paste at all if the input is a video (now it copy-pastes, and if input is deforum run is also converts to make sure no errors rise cuz of 24-32 bit depth differences)
|
|
temp_convert_raw_png_path = os.path.join(from_folder, to_folder)
|
|
if not os.path.exists(temp_convert_raw_png_path):
|
|
os.makedirs(temp_convert_raw_png_path)
|
|
|
|
frames_handled = 0
|
|
for f in os.listdir(from_folder):
|
|
if ('png' in f or 'jpg' in f) and '-' not in f and '_depth_' not in f and ((img_batch_id is not None and f.startswith(img_batch_id) or img_batch_id is None)):
|
|
frames_handled +=1
|
|
original_img_path = os.path.join(from_folder, f)
|
|
if orig_vid_name is not None:
|
|
shutil.copy(original_img_path, temp_convert_raw_png_path)
|
|
else:
|
|
image = cv2.imread(original_img_path)
|
|
new_path = os.path.join(temp_convert_raw_png_path, f)
|
|
cv2.imwrite(new_path, image, [cv2.IMWRITE_PNG_COMPRESSION, 0])
|
|
return frames_handled
|
|
|
|
def add_soundtrack(ffmpeg_location=None, fps=None, outmp4_path=None, stitch_from_frame=0, stitch_to_frame=None, imgs_path=None, add_soundtrack=None, audio_path=None, crf=17, preset='veryslow', metadata=None):
|
|
if add_soundtrack is None:
|
|
return
|
|
msg_to_print = f"Adding soundtrack to *video*..."
|
|
start_time = time.time()
|
|
try:
|
|
cmd = [
|
|
ffmpeg_location,
|
|
'-i',
|
|
outmp4_path,
|
|
'-i',
|
|
audio_path,
|
|
'-map', '0:v',
|
|
'-map', '1:a',
|
|
'-c:v', 'copy',
|
|
'-shortest',
|
|
outmp4_path+'.temp.mp4'
|
|
]
|
|
process = subprocess.Popen(
|
|
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
stdout, stderr = process.communicate()
|
|
if process.returncode != 0:
|
|
print("\r" + " " * len(msg_to_print), end="", flush=True)
|
|
print(f"\r{msg_to_print}", flush=True)
|
|
raise RuntimeError(stderr)
|
|
os.replace(outmp4_path+'.temp.mp4', outmp4_path)
|
|
print("\r" + " " * len(msg_to_print), end="", flush=True)
|
|
print(f"\r{msg_to_print}", flush=True)
|
|
print(f"\rFFmpeg Audio stitching \033[0;32mdone\033[0m in {time.time() - start_time:.2f} seconds!", flush=True)
|
|
except Exception as e:
|
|
print("\r" + " " * len(msg_to_print), end="", flush=True)
|
|
print(f"\r{msg_to_print}", flush=True)
|
|
print(f'\rError adding audio to video. Actual error: {e}', flush=True)
|
|
print(f"FFMPEG Video (sorry, no audio) stitching \033[33mdone\033[0m in {time.time() - start_time:.2f} seconds!", flush=True)
|