# yapf: disable
import glob
import json
import logging
import os
import shutil
import subprocess
from pathlib import Path
from typing import Any, Tuple, Union
import cv2
import numpy as np
from .log_utils import get_logger
from .path_utils import (
Existence,
check_path,
check_path_suffix,
prepare_output_path,
)
# yapf: enable
class VideoInfoReader():
INFO_KEYS = [
'index', 'codec_name', 'codec_long_name', 'profile', 'codec_type',
'codec_time_base', 'codec_tag_string', 'codec_tag', 'width', 'height',
'coded_width', 'coded_height', 'has_b_frames', 'pix_fmt', 'level',
'chroma_location', 'refs', 'is_avc', 'nal_length_size', 'r_frame_rate',
'avg_frame_rate', 'time_base', 'start_pts', 'start_time',
'duration_ts', 'duration', 'bit_rate', 'bits_per_raw_sample',
'nb_frames', 'disposition', 'tags'
]
def __init__(self,
input_path: str,
logger: Union[None, str, logging.Logger] = None) -> None:
"""Get video information from video, mimiced from ffmpeg-python.
https://github.com/kkroening/ffmpeg-python.
Args:
vid_file (str): Path to the video file.
logger (Union[None, str, logging.Logger], optional):
Logger for logging. If None, root logger will be selected.
Defaults to None.
Raises:
FileNotFoundError: check the input path.
Returns:
None.
"""
self.logger = get_logger(logger)
check_path(
input_path=input_path,
allowed_existence=[Existence.FileExist],
allowed_suffix=[
'.mp4', '.mkv', '.avi', '.gif', '.png', '.jpg', '.jpeg'
],
path_type='file',
logger=logger)
cmd = [
'ffprobe', '-show_format', '-show_streams', '-of', 'json',
input_path
]
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, _ = process.communicate()
probe = json.loads(out.decode('utf-8'))
video_stream = next((stream for stream in probe['streams']
if stream['codec_type'] == 'video'), None)
if video_stream is None:
self.logger.error('No video stream found')
raise ValueError
self.video_stream = video_stream
def __getitem__(self, key: str) -> Any:
"""Get the corresponding information according to the key.
Args:
key (str):
A key in VideoInfoReader.INFO_KEYS
Such as codec_name, pix_fmt, duration, etc.
Raises:
KeyError: key cannot be found in VideoInfoReader.INFO_KEYS
Returns:
Any: The expected information.
"""
if key not in self.__class__.INFO_KEYS:
self.logger.error(
'Wronge vid info key.' +
f'Select one key from {self.__class__.INFO_KEYS}')
raise KeyError
return self.video_stream[key]
class VideoWriter:
def __init__(self,
output_path: str,
resolution: Tuple[int, int],
fps: float = 30.0,
n_frames: int = 1e9,
disable_log: bool = False,
logger: Union[None, str, logging.Logger] = None) -> None:
"""Write video file by ffmpeg.
Args:
output_path (str):
Path to the output video file, which shall end with '.mp4'.
resolution (List[int, int]):
Resolution of the video, [height, width].
fps (float, optional):
Frame per second. Defaults to 30.0.
n_frames (int, optional):
Limit number of frames in this video.
Defaults to 1e9.
disable_log (bool, optional):
Whether to disable logs of info level.
Defaults to False.
logger (Union[None, str, logging.Logger], optional):
Logger for logging. If None, root logger will be selected.
Defaults to None.
Raises:
BrokenPipeError: No buffer received.
"""
self.logger = get_logger(logger)
prepare_output_path(
output_path,
allowed_suffix=['.mp4'],
tag='output video',
path_type='file',
overwrite=True)
height, width = resolution
width += width % 2
height += height % 2
command = [
'ffmpeg',
'-y', # (optional) overwrite output file if it exists
'-f',
'rawvideo',
'-pix_fmt',
'bgr24',
'-s',
f'{int(width)}x{int(height)}',
'-r',
f'{fps}', # frames per second
'-loglevel',
'error',
'-threads',
'1',
'-i',
'-', # The input comes from a pipe
'-vcodec',
'libx264',
'-r',
f'{fps}', # frames per second
'-an', # Tells FFMPEG not to expect any audio
output_path,
]
if not disable_log:
self.logger.info(f'Running \"{" ".join(command)}\"')
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if process.stdin is None or process.stderr is None:
self.logger.error('No buffer received.')
raise BrokenPipeError
self.process = process
self.n_frames = n_frames
self.len = 0
def write(self, image_array: np.ndarray):
"""Write an image to the video file.
Args:
image_array (np.ndarray):
An array of a single image, in shape
[H, W, C].
"""
if self.len < self.n_frames:
try:
self.process.stdin.write(image_array.tobytes())
self.len += 1
except KeyboardInterrupt:
self.__del__()
if self.len >= self.n_frames:
self.__del__()
def __del__(self):
self.process.stdin.close()
self.process.stderr.close()
self.process.wait()
def close(self):
"""Manually close this video writer."""
self.__del__()
[docs]def pad_for_libx264(image_array: np.ndarray) -> np.ndarray:
"""Pad zeros if width or height of image_array is not divisible by 2.
Otherwise you will get.
\"[libx264 @ 0x1b1d560] width not divisible by 2 \"
Args:
image_array (np.ndarray):
Image or images load by cv2.imread().
Possible shapes:
1. [height, width]
2. [height, width, channels]
3. [images, height, width]
4. [images, height, width, channels]
Returns:
np.ndarray:
A image with both edges divisible by 2.
"""
if image_array.ndim == 2 or \
(image_array.ndim == 3 and image_array.shape[2] == 3):
hei_index = 0
wid_index = 1
elif image_array.ndim == 4 or \
(image_array.ndim == 3 and image_array.shape[2] != 3):
hei_index = 1
wid_index = 2
else:
return image_array
hei_pad = image_array.shape[hei_index] % 2
wid_pad = image_array.shape[wid_index] % 2
if hei_pad + wid_pad > 0:
pad_width = []
for dim_index in range(image_array.ndim):
if dim_index == hei_index:
pad_width.append((0, hei_pad))
elif dim_index == wid_index:
pad_width.append((0, wid_pad))
else:
pad_width.append((0, 0))
values = 0
image_array = \
np.pad(image_array,
pad_width,
mode='constant', constant_values=values)
return image_array
[docs]def video_to_array(
input_path: str,
resolution: Union[Tuple[int, int], Tuple[float, float]] = None,
start: int = 0,
end: int = None,
disable_log: bool = False,
logger: Union[None, str, logging.Logger] = None) -> np.ndarray:
"""Read a video/gif as an array of (f * h * w * 3).
Args:
input_path (str): input path.
resolution (Union[Tuple[int, int], Tuple[float, float]], optional):
resolution(height, width) of output. Defaults to None.
start (int, optional): start frame index. Inclusive.
If < 0, will be converted to frame_index range in [0, n_frame].
Defaults to 0.
end (int, optional): end frame index. Exclusive.
Could be positive int or negative int or None.
If None, all frames from start till the last frame are included.
Defaults to None.
disable_log (bool, optional): whether close the ffmepg command info.
Defaults to False.
logger (Union[None, str, logging.Logger], optional):
Logger for logging. If None, root logger will be selected.
Defaults to None.
Raises:
FileNotFoundError: check the input path.
Returns:
np.ndarray: shape will be (f * h * w * 3).
"""
info = VideoInfoReader(input_path, logger=logger)
if resolution:
height, width = resolution
else:
width, height = int(info['width']), int(info['height'])
n_frames = int(info['nb_frames'])
start = (min(start, n_frames - 1) + n_frames) % n_frames
end = (min(end, n_frames - 1) +
n_frames) % n_frames if end is not None else n_frames
command = [
'ffmpeg',
'-i',
input_path,
'-filter_complex',
f'[0]trim=start_frame={start}:end_frame={end}[v0]',
'-map',
'[v0]',
'-pix_fmt',
'bgr24', # bgr24 for matching OpenCV
'-s',
f'{int(width)}x{int(height)}',
'-f',
'image2pipe',
'-vcodec',
'rawvideo',
'-loglevel',
'error',
'pipe:'
]
if not disable_log:
logger = get_logger(logger)
logger.info(f'Running \"{" ".join(command)}\"')
# Execute FFmpeg as sub-process with stdout as a pipe
process = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=10**8)
if process.stdout is None:
raise BrokenPipeError('No buffer received.')
# Read decoded video frames from the PIPE until no more frames to read
array = []
while True:
# Read decoded video frame (in raw video format) from stdout process.
buffer = process.stdout.read(int(width * height * 3))
# Break the loop if buffer length is not W*H*3\
# (when FFmpeg streaming ends).
if len(buffer) != width * height * 3:
break
img = np.frombuffer(buffer, np.uint8).reshape(height, width, 3)
array.append(img[np.newaxis])
process.stdout.flush()
process.stdout.close()
process.wait()
return np.concatenate(array)
[docs]def images_to_array_opencv(
input_folder: str,
resolution: Union[Tuple[int, int], Tuple[float, float]] = None,
img_format: Union[str, None] = None,
start: int = 0,
end: int = None,
logger: Union[None, str, logging.Logger] = None) -> np.ndarray:
"""Read a folder of images as an array of (f * h * w * 3).
Args:
input_folder (str): folder of input images.
resolution (Union[Tuple[int, int], Tuple[float, float]]):
resolution(height, width) of output. Defaults to None.
img_format (str, optional):
Format of images to be read, 'jpg' or 'png'.
Defaults to None.
start (int, optional): start frame index. Inclusive.
If < 0, will be converted to frame_index range in [0, n_frame].
Defaults to 0.
end (int, optional): end frame index. Exclusive.
Could be positive int or negative int or None.
If None, all frames from start till the last frame are included.
Defaults to None.
logger (Union[None, str, logging.Logger], optional):
Logger for logging. If None, root logger will be selected.
Defaults to None.
Raises:
FileNotFoundError: check the input path.
Returns:
np.ndarray: shape will be (f * h * w * 3).
"""
check_path(
input_path=input_folder,
allowed_existence=[Existence.DirectoryExistNotEmpty],
allowed_suffix=[''],
path_type='dir',
logger=logger)
if img_format is None:
frame_list = []
frame_names = sorted(os.listdir(input_folder))
for name in frame_names:
abs_path = os.path.join(input_folder, name)
if check_path_suffix(abs_path, ['.jpg', '.jpeg', '.png']):
frame_list.append(abs_path)
else:
frame_list = sorted(
glob.glob(os.path.join(input_folder, f'*.{img_format}')))
if end is None:
frame_list = frame_list[start:]
else:
frame_list = frame_list[start:end]
array_list = []
for index, frame_path in enumerate(frame_list):
img = cv2.imread(frame_path)
if index == 0 and resolution is None:
resolution = img.shape[0:2]
else:
img = cv2.resize(img, (resolution[1], resolution[0]))
array_list.append(img)
return np.asarray(array_list)
[docs]def images_to_array(
input_folder: str,
resolution: Union[Tuple[int, int], Tuple[float, float]] = None,
img_format: str = '%06d.png',
start: int = 0,
end: int = None,
remove_raw_files: bool = False,
disable_log: bool = False,
logger: Union[None, str, logging.Logger] = None) -> np.ndarray:
"""Read a folder of images as an array of (f * h * w * 3).
Args:
input_folder (str): folder of input images.
resolution (Union[Tuple[int, int], Tuple[float, float]]):
resolution(height, width) of output. Defaults to None.
img_format (str, optional): format of images to be read.
Defaults to '%06d.png'.
start (int, optional): start frame index. Inclusive.
If < 0, will be converted to frame_index range in [0, n_frame].
Defaults to 0.
end (int, optional): end frame index. Exclusive.
Could be positive int or negative int or None.
If None, all frames from start till the last frame are included.
Defaults to None.
remove_raw_files (bool, optional): whether remove raw images.
Defaults to False.
disable_log (bool, optional): whether close the ffmepg command info.
Defaults to False.
Raises:
FileNotFoundError: check the input path.
Returns:
np.ndarray: shape will be (f * h * w * 3).
"""
check_path(
input_path=input_folder,
allowed_existence=[Existence.DirectoryExistNotEmpty],
allowed_suffix=[''],
path_type='dir',
logger=logger)
input_folder_info = Path(input_folder)
temp_input_folder = None
if img_format is None:
temp_input_folder = os.path.join(input_folder_info.parent,
input_folder_info.name + '_temp')
img_format = images_to_sorted_images(
input_folder=input_folder, output_folder=temp_input_folder)
input_folder = temp_input_folder
info = VideoInfoReader(f'{input_folder}/{img_format}' % start)
width, height = int(info['width']), int(info['height'])
if resolution:
height, width = resolution
else:
width, height = int(info['width']), int(info['height'])
n_frames = len(os.listdir(input_folder))
start = max(start, 0) % n_frames
end = min(end, n_frames) % (n_frames + 1) \
if end is not None else n_frames
command = [
'ffmpeg',
'-y',
'-threads',
'1',
'-start_number',
f'{start}',
'-i',
f'{input_folder}/{img_format}',
'-frames:v',
f'{end - start}',
'-f',
'rawvideo',
'-pix_fmt',
'bgr24', # bgr24 for matching OpenCV
'-s',
f'{int(width)}x{int(height)}',
'-loglevel',
'error',
'-'
]
if not disable_log:
logger = get_logger(logger)
logger.info(f'Running \"{" ".join(command)}\"')
process = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=10**8)
if process.stdout is None:
raise BrokenPipeError('No buffer received.')
# Read decoded video frames from the PIPE until no more frames to read
array = []
while True:
# Read decoded video frame (in raw video format) from stdout process.
buffer = process.stdout.read(int(width * height * 3))
# Break the loop if buffer length is not W*H*3\
# (when FFmpeg streaming ends).
if len(buffer) != width * height * 3:
break
img = np.frombuffer(buffer, np.uint8).reshape(height, width, 3)
array.append(img[np.newaxis])
process.stdout.flush()
process.stdout.close()
process.wait()
if temp_input_folder is not None and\
os.path.isdir(temp_input_folder):
shutil.rmtree(temp_input_folder)
if remove_raw_files and\
os.path.isdir(input_folder):
shutil.rmtree(input_folder)
return np.concatenate(array)
[docs]def images_to_sorted_images(input_folder, output_folder, img_format='%06d'):
"""Copy and rename a folder of images into a new folder following the
`img_format`.
Args:
input_folder (str): input folder.
output_folder (str): output folder.
img_format (str, optional): image format name, do not need extension.
Defaults to '%06d'.
Returns:
str: image format of the rename images.
"""
img_format = img_format.rsplit('.', 1)[0]
file_list = []
os.makedirs(output_folder, exist_ok=True)
pngs = glob.glob(os.path.join(input_folder, '*.png'))
if pngs:
ext = 'png'
file_list.extend(pngs)
jpgs = glob.glob(os.path.join(input_folder, '*.jpg'))
if jpgs:
ext = 'jpg'
file_list.extend(jpgs)
file_list.sort()
for index, file_name in enumerate(file_list):
shutil.copy(
file_name,
os.path.join(output_folder, (img_format + '.%s') % (index, ext)))
return img_format + '.%s' % ext
[docs]def array_to_video(image_array: np.ndarray,
output_path: str,
fps: Union[int, float] = 30,
resolution: Union[Tuple[int, int], Tuple[float,
float]] = None,
disable_log: bool = False,
logger: Union[None, str, logging.Logger] = None) -> None:
"""Convert an array to a video directly, gif not supported.
Args:
image_array (np.ndarray): shape should be (f * h * w * 3).
output_path (str): output video file path.
fps (Union[int, float, optional): fps. Defaults to 30.
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], \
optional): (height, width) of the output video. Defaults to None.
disable_log (bool, optional): whether close the ffmepg command info.
Defaults to False.
Raises:
FileNotFoundError: check output path.
TypeError: check input array.
Returns:
None.
"""
if not isinstance(image_array, np.ndarray):
raise TypeError('Input should be np.ndarray.')
assert image_array.ndim == 4
assert image_array.shape[-1] == 3
prepare_output_path(
output_path,
allowed_suffix=['.mp4'],
tag='output video',
path_type='file',
overwrite=True,
logger=logger)
if resolution:
height, width = resolution
width += width % 2
height += height % 2
else:
image_array = pad_for_libx264(image_array)
height, width = image_array.shape[1], image_array.shape[2]
command = [
'ffmpeg',
'-y', # (optional) overwrite output file if it exists
'-f',
'rawvideo',
'-s',
f'{int(width)}x{int(height)}', # size of one frame
'-pix_fmt',
'bgr24',
'-r',
f'{fps}', # frames per second
'-loglevel',
'error',
'-threads',
'4',
'-i',
'-', # The input comes from a pipe
'-vcodec',
'libx264',
'-an', # Tells FFMPEG not to expect any audio
output_path,
]
if not disable_log:
logger = get_logger(logger)
logger.info(f'Running \"{" ".join(command)}\"')
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if process.stdin is None or process.stderr is None:
raise BrokenPipeError('No buffer received.')
index = 0
while True:
if index >= image_array.shape[0]:
break
process.stdin.write(image_array[index].tobytes())
index += 1
process.stdin.close()
process.stderr.close()
process.wait()
[docs]def array_to_images(image_array: np.ndarray,
output_folder: str,
img_format: str = '%06d.png',
resolution: Union[Tuple[int, int], Tuple[float,
float]] = None,
disable_log: bool = False,
logger: Union[None, str, logging.Logger] = None) -> None:
"""Convert an array to images directly.
Args:
image_array (np.ndarray): shape should be (f * h * w * 3).
output_folder (str): output folder for the images.
img_format (str, optional): format of the images.
Defaults to '%06d.png'.
resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], \
optional): (height, width) of the output images. Defaults to None.
disable_log (bool, optional): whether close the ffmepg command info.
Defaults to False.
Raises:
FileNotFoundError: check output folder.
TypeError: check input array.
Returns:
None
"""
logger = get_logger(logger)
prepare_output_path(
output_folder,
allowed_suffix=[],
tag='output image folder',
path_type='dir',
overwrite=True)
if not isinstance(image_array, np.ndarray):
raise TypeError('Input should be np.ndarray.')
assert image_array.ndim == 4
assert image_array.shape[-1] == 3
if resolution:
height, width = resolution
logger.error('Resolution not correctly implemented.')
raise NotImplementedError
else:
height, width = image_array.shape[1], image_array.shape[2]
command = [
'ffmpeg',
'-y', # (optional) overwrite output file if it exists
'-f',
'rawvideo',
'-s',
f'{int(width)}x{int(height)}', # size of one frame
'-pix_fmt',
'bgr24', # bgr24 for matching OpenCV
'-loglevel',
'error',
'-threads',
'4',
'-i',
'-', # The input comes from a pipe
'-f',
'image2',
'-start_number',
'0',
os.path.join(output_folder, img_format),
]
if not disable_log:
logger = get_logger(logger)
logger.info(f'Running \"{" ".join(command)}\"')
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=10**8,
close_fds=True)
if process.stdin is None or process.stderr is None:
raise BrokenPipeError('No buffer received.')
index = 0
while True:
if index >= image_array.shape[0]:
break
process.stdin.write(image_array[index].tobytes())
index += 1
process.stdin.close()
process.stderr.close()
process.wait()