Source code for xrprimer.utils.ffmpeg_utils

# yapf: disable
import glob
import json
import logging
import os
import shutil
import subprocess
from pathlib import Path
from typing import Any, Tuple, Union

import numpy as np

from .log_utils import get_logger
from .path_utils import (
    Existence,
    check_path,
    check_path_suffix,
    prepare_output_path,
)

# yapf: enable


class VideoInfoReader():
    INFO_KEYS = [
        'index', 'codec_name', 'codec_long_name', 'profile', 'codec_type',
        'codec_time_base', 'codec_tag_string', 'codec_tag', 'width', 'height',
        'coded_width', 'coded_height', 'has_b_frames', 'pix_fmt', 'level',
        'chroma_location', 'refs', 'is_avc', 'nal_length_size', 'r_frame_rate',
        'avg_frame_rate', 'time_base', 'start_pts', 'start_time',
        'duration_ts', 'duration', 'bit_rate', 'bits_per_raw_sample',
        'nb_frames', 'disposition', 'tags'
    ]

    def __init__(self,
                 input_path: str,
                 logger: Union[None, str, logging.Logger] = None) -> None:
        """Get video information from video, mimiced from ffmpeg-python.
        https://github.com/kkroening/ffmpeg-python.

        Args:
            vid_file (str): Path to the video file.
            logger (Union[None, str, logging.Logger], optional):
                Logger for logging. If None, root logger will be selected.
                Defaults to None.

        Raises:
            FileNotFoundError: check the input path.

        Returns:
            None.
        """
        self.logger = get_logger(logger)
        check_path(
            input_path=input_path,
            allowed_existence=[Existence.FileExist],
            allowed_suffix=[
                '.mp4', '.mkv', '.avi', '.gif', '.png', '.jpg', '.jpeg'
            ],
            path_type='file',
            logger=logger)
        cmd = [
            'ffprobe', '-show_format', '-show_streams', '-of', 'json',
            input_path
        ]
        process = subprocess.Popen(
            cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, _ = process.communicate()
        probe = json.loads(out.decode('utf-8'))
        video_stream = next((stream for stream in probe['streams']
                             if stream['codec_type'] == 'video'), None)
        if video_stream is None:
            self.logger.error('No video stream found')
            raise ValueError
        self.video_stream = video_stream

    def __getitem__(self, key: str) -> Any:
        """Get the corresponding information according to the key.

        Args:
            key (str):
                A key in VideoInfoReader.INFO_KEYS
                Such as codec_name, pix_fmt, duration, etc.

        Raises:
            KeyError: key cannot be found in VideoInfoReader.INFO_KEYS

        Returns:
            Any: The expected information.
        """
        if key not in self.__class__.INFO_KEYS:
            self.logger.error(
                'Wronge vid info key.' +
                f'Select one key from {self.__class__.INFO_KEYS}')
            raise KeyError
        return self.video_stream[key]


class VideoReader:
    """VideoReader for reading video frames as an image ndarray for OpenCV.

    It opens a PIPE from ffmpeg command and reads one frame per call, friendly
    for poor RAM devices.
    """

    def __init__(self,
                 input_path: str,
                 resolution: Union[Tuple[int, int], Tuple[float,
                                                          float]] = None,
                 start: int = 0,
                 end: int = None,
                 disable_log: bool = False,
                 logger: Union[None, str, logging.Logger] = None) -> None:
        """
        Args:
            input_path (str): Input path.
            resolution (Union[Tuple[int, int], Tuple[float, float]],
                    optional):
                Resolution(height, width) of output.
                Defaults to None.
            start (int, optional):
                Start frame index. Inclusive.
                If < 0, will be converted to frame_index range in [0, n_frame].
                Defaults to 0.
            end (int, optional):
                End frame index. Exclusive.
                Could be positive int or negative int or None.
                If None, all frames from start till the last
                frame are included.
                Defaults to None.
            disable_log (bool, optional):
                Whether close the ffmepg command info.
                Defaults to False.
            logger (Union[None, str, logging.Logger], optional):
                Logger for logging. If None, root logger will be selected.
                Defaults to None.

        Raises:
            BrokenPipeError: No buffer received from ffmpeg.
        """
        self.logger = get_logger(logger)
        info = VideoInfoReader(input_path, logger=logger)
        if resolution:
            self.height, self.width = resolution
        else:
            self.width, self.height = int(info['width']), int(info['height'])
        n_frames = int(info['nb_frames'])
        start = min(max(0, start), n_frames - 1)
        end = (min(end, n_frames - 1) + n_frames) % n_frames \
            if end is not None \
            else n_frames
        command = [
            'ffmpeg',
            '-i',
            input_path,
            '-filter_complex',
            f'[0]trim=start_frame={start}:end_frame={end}[v0]',
            '-map',
            '[v0]',
            '-pix_fmt',
            'bgr24',  # bgr24 for matching OpenCV
            '-s',
            f'{int(self.width)}x{int(self.height)}',
            '-f',
            'image2pipe',
            '-vcodec',
            'rawvideo',
            '-loglevel',
            'error',
            'pipe:'
        ]
        if not disable_log:
            self.logger.info(f'Running \"{" ".join(command)}\"')
        # Execute FFmpeg as sub-process with stdout as a pipe
        process = subprocess.Popen(
            command, stdout=subprocess.PIPE, bufsize=10**8)
        if process.stdout is None:
            self.logger.error('No buffer received.')
            raise BrokenPipeError
        self.process = process
        self.start = start
        self.end = end

    def get_next_frame(self) -> np.ndarray:
        """Read the next frame as opencv image, from the opened video file.

        Args:
            image_array (np.ndarray):
                An array of a single image, in shape
                [H, W, C].
        """
        # Read decoded video frame (in raw video format) from stdout process.
        buffer = self.process.stdout.read(int(self.width * self.height * 3))
        # Return None if buffer length is not W*H*3\
        # (when FFmpeg streaming ends).
        if len(buffer) != self.width * self.height * 3:
            return None
        img = np.frombuffer(buffer, np.uint8).reshape(self.height, self.width,
                                                      3)
        return img

    def __del__(self) -> None:
        self.process.stdout.close()
        self.process.wait()

    def close(self) -> None:
        """Manually close this video writer."""
        self.__del__()


[docs]class VideoWriter: """VideoWriter for writing OpenCV image array to a video file. It opens a PIPE from ffmpeg command and writes one frame per call, friendly for poor RAM devices. """ def __init__(self, output_path: str, resolution: Tuple[int, int], fps: float = 30.0, n_frames: int = 1e9, disable_log: bool = False, logger: Union[None, str, logging.Logger] = None) -> None: """Write video file by ffmpeg. Args: output_path (str): Path to the output video file, which shall end with '.mp4'. resolution (List[int, int]): Resolution of the video, [height, width]. fps (float, optional): Frame per second. Defaults to 30.0. n_frames (int, optional): Limit number of frames in this video. Defaults to 1e9. disable_log (bool, optional): Whether to disable logs of info level. Defaults to False. logger (Union[None, str, logging.Logger], optional): Logger for logging. If None, root logger will be selected. Defaults to None. Raises: BrokenPipeError: No buffer received. """ self.logger = get_logger(logger) prepare_output_path( output_path, allowed_suffix=['.mp4'], tag='output video', path_type='file', overwrite=True) height, width = resolution width += width % 2 height += height % 2 command = [ 'ffmpeg', '-y', # (optional) overwrite output file if it exists '-f', 'rawvideo', '-pix_fmt', 'bgr24', '-s', f'{int(width)}x{int(height)}', '-r', f'{fps}', # frames per second '-loglevel', 'error', '-threads', '1', '-i', '-', # The input comes from a pipe '-vcodec', 'libx264', '-r', f'{fps}', # frames per second '-an', # Tells FFMPEG not to expect any audio output_path, ] if not disable_log: self.logger.info(f'Running \"{" ".join(command)}\"') process = subprocess.Popen( command, stdin=subprocess.PIPE, stderr=subprocess.PIPE, ) if process.stdin is None or process.stderr is None: self.logger.error('No buffer received.') raise BrokenPipeError self.process = process self.n_frames = n_frames self.len = 0
[docs] def write(self, image_array: np.ndarray): """Write an image to the video file. Args: image_array (np.ndarray): An array of a single image, in shape [H, W, C]. """ if self.len < self.n_frames: try: self.process.stdin.write(image_array.tobytes()) self.len += 1 except KeyboardInterrupt: self.__del__() if self.len >= self.n_frames: self.__del__()
def __del__(self): self.process.stdin.close() self.process.stderr.close() self.process.wait()
[docs] def close(self): """Manually close this video writer.""" self.__del__()
[docs]def pad_for_libx264(image_array: np.ndarray) -> np.ndarray: """Pad zeros if width or height of image_array is not divisible by 2. Otherwise you will get. \"[libx264 @ 0x1b1d560] width not divisible by 2 \" Args: image_array (np.ndarray): Image or images load by cv2.imread(). Possible shapes: 1. [height, width] 2. [height, width, channels] 3. [images, height, width] 4. [images, height, width, channels] Returns: np.ndarray: A image with both edges divisible by 2. """ if image_array.ndim == 2 or \ (image_array.ndim == 3 and image_array.shape[2] == 3): hei_index = 0 wid_index = 1 elif image_array.ndim == 4 or \ (image_array.ndim == 3 and image_array.shape[2] != 3): hei_index = 1 wid_index = 2 else: return image_array hei_pad = image_array.shape[hei_index] % 2 wid_pad = image_array.shape[wid_index] % 2 if hei_pad + wid_pad > 0: pad_width = [] for dim_index in range(image_array.ndim): if dim_index == hei_index: pad_width.append((0, hei_pad)) elif dim_index == wid_index: pad_width.append((0, wid_pad)) else: pad_width.append((0, 0)) values = 0 image_array = \ np.pad(image_array, pad_width, mode='constant', constant_values=values) return image_array
[docs]def video_to_array( input_path: str, resolution: Union[Tuple[int, int], Tuple[float, float]] = None, start: int = 0, end: int = None, disable_log: bool = False, logger: Union[None, str, logging.Logger] = None) -> np.ndarray: """Read a video/gif as an array of (f * h * w * 3). Args: input_path (str): input path. resolution (Union[Tuple[int, int], Tuple[float, float]], optional): resolution(height, width) of output. Defaults to None. start (int, optional): start frame index. Inclusive. If < 0, will be converted to frame_index range in [0, n_frame]. Defaults to 0. end (int, optional): end frame index. Exclusive. Could be positive int or negative int or None. If None, all frames from start till the last frame are included. Defaults to None. disable_log (bool, optional): whether close the ffmepg command info. Defaults to False. logger (Union[None, str, logging.Logger], optional): Logger for logging. If None, root logger will be selected. Defaults to None. Raises: FileNotFoundError: check the input path. Returns: np.ndarray: shape will be (f * h * w * 3). """ info = VideoInfoReader(input_path, logger=logger) if resolution: height, width = resolution else: width, height = int(info['width']), int(info['height']) n_frames = int(info['nb_frames']) start = max(start, 0) % (n_frames + 1) end = min(end, n_frames) % (n_frames + 1) if end is not None else n_frames command = [ 'ffmpeg', '-i', input_path, '-filter_complex', f'[0]trim=start_frame={start}:end_frame={end}[v0]', '-map', '[v0]', '-pix_fmt', 'bgr24', # bgr24 for matching OpenCV '-s', f'{int(width)}x{int(height)}', '-f', 'image2pipe', '-vcodec', 'rawvideo', '-loglevel', 'error', 'pipe:' ] if not disable_log: logger = get_logger(logger) logger.info(f'Running \"{" ".join(command)}\"') # Execute FFmpeg as sub-process with stdout as a pipe process = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=10**8) if process.stdout is None: raise BrokenPipeError('No buffer received.') # Read decoded video frames from the PIPE until no more frames to read array = [] while True: # Read decoded video frame (in raw video format) from stdout process. buffer = process.stdout.read(int(width * height * 3)) # Break the loop if buffer length is not W*H*3\ # (when FFmpeg streaming ends). if len(buffer) != width * height * 3: break img = np.frombuffer(buffer, np.uint8).reshape(height, width, 3) array.append(img[np.newaxis]) process.stdout.flush() process.stdout.close() process.wait() return np.concatenate(array)
[docs]def images_to_array_opencv( input_folder: str, resolution: Union[Tuple[int, int], Tuple[float, float]] = None, img_format: Union[str, None] = None, start: int = 0, end: int = None, logger: Union[None, str, logging.Logger] = None) -> np.ndarray: """Read a folder of images as an array of (f * h * w * 3). Args: input_folder (str): folder of input images. resolution (Union[Tuple[int, int], Tuple[float, float]]): resolution(height, width) of output. Defaults to None. img_format (str, optional): Format of images to be read, 'jpg' or 'png'. Defaults to None. start (int, optional): start frame index. Inclusive. If < 0, will be converted to frame_index range in [0, n_frame]. Defaults to 0. end (int, optional): end frame index. Exclusive. Could be positive int or negative int or None. If None, all frames from start till the last frame are included. Defaults to None. logger (Union[None, str, logging.Logger], optional): Logger for logging. If None, root logger will be selected. Defaults to None. Raises: FileNotFoundError: check the input path. Returns: np.ndarray: shape will be (f * h * w * 3). """ import cv2 check_path( input_path=input_folder, allowed_existence=[Existence.DirectoryExistNotEmpty], allowed_suffix=[''], path_type='dir', logger=logger) if img_format is None: frame_list = [] frame_names = sorted(os.listdir(input_folder)) for name in frame_names: abs_path = os.path.join(input_folder, name) if check_path_suffix(abs_path, ['.jpg', '.jpeg', '.png']): frame_list.append(abs_path) else: frame_list = sorted( glob.glob(os.path.join(input_folder, f'*.{img_format}'))) if end is None: frame_list = frame_list[start:] else: frame_list = frame_list[start:end] array_list = [] for index, frame_path in enumerate(frame_list): img = cv2.imread(frame_path) if index == 0 and resolution is None: resolution = img.shape[0:2] else: img = cv2.resize(img, (resolution[1], resolution[0])) array_list.append(img) return np.asarray(array_list)
[docs]def images_to_array( input_folder: str, resolution: Union[Tuple[int, int], Tuple[float, float]] = None, img_format: str = '%06d.png', start: int = 0, end: int = None, remove_raw_files: bool = False, disable_log: bool = False, logger: Union[None, str, logging.Logger] = None) -> np.ndarray: """Read a folder of images as an array of (f * h * w * 3). Args: input_folder (str): folder of input images. resolution (Union[Tuple[int, int], Tuple[float, float]]): resolution(height, width) of output. Defaults to None. img_format (str, optional): format of images to be read. Defaults to '%06d.png'. start (int, optional): start frame index. Inclusive. If < 0, will be converted to frame_index range in [0, n_frame]. Defaults to 0. end (int, optional): end frame index. Exclusive. Could be positive int or negative int or None. If None, all frames from start till the last frame are included. Defaults to None. remove_raw_files (bool, optional): whether remove raw images. Defaults to False. disable_log (bool, optional): whether close the ffmepg command info. Defaults to False. Raises: FileNotFoundError: check the input path. Returns: np.ndarray: shape will be (f * h * w * 3). """ check_path( input_path=input_folder, allowed_existence=[Existence.DirectoryExistNotEmpty], allowed_suffix=[''], path_type='dir', logger=logger) input_folder_info = Path(input_folder) temp_input_folder = None if img_format is None: temp_input_folder = os.path.join(input_folder_info.parent, input_folder_info.name + '_temp') img_format = images_to_sorted_images( input_folder=input_folder, output_folder=temp_input_folder) input_folder = temp_input_folder info = VideoInfoReader(f'{input_folder}/{img_format}' % start) width, height = int(info['width']), int(info['height']) if resolution: height, width = resolution else: width, height = int(info['width']), int(info['height']) n_frames = len(os.listdir(input_folder)) start = max(start, 0) % (n_frames + 1) end = min(end, n_frames) % (n_frames + 1) \ if end is not None else n_frames command = [ 'ffmpeg', '-y', '-threads', '1', '-start_number', f'{start}', '-i', f'{input_folder}/{img_format}', '-frames:v', f'{end - start}', '-f', 'rawvideo', '-pix_fmt', 'bgr24', # bgr24 for matching OpenCV '-s', f'{int(width)}x{int(height)}', '-loglevel', 'error', '-' ] if not disable_log: logger = get_logger(logger) logger.info(f'Running \"{" ".join(command)}\"') process = subprocess.Popen(command, stdout=subprocess.PIPE, bufsize=10**8) if process.stdout is None: raise BrokenPipeError('No buffer received.') # Read decoded video frames from the PIPE until no more frames to read array = [] while True: # Read decoded video frame (in raw video format) from stdout process. buffer = process.stdout.read(int(width * height * 3)) # Break the loop if buffer length is not W*H*3\ # (when FFmpeg streaming ends). if len(buffer) != width * height * 3: break img = np.frombuffer(buffer, np.uint8).reshape(height, width, 3) array.append(img[np.newaxis]) process.stdout.flush() process.stdout.close() process.wait() if temp_input_folder is not None and\ os.path.isdir(temp_input_folder): shutil.rmtree(temp_input_folder) if remove_raw_files and\ os.path.isdir(input_folder): shutil.rmtree(input_folder) return np.concatenate(array)
[docs]def images_to_sorted_images(input_folder, output_folder, img_format='%06d'): """Copy and rename a folder of images into a new folder following the `img_format`. Args: input_folder (str): input folder. output_folder (str): output folder. img_format (str, optional): image format name, do not need extension. Defaults to '%06d'. Returns: str: image format of the rename images. """ img_format = img_format.rsplit('.', 1)[0] file_list = [] os.makedirs(output_folder, exist_ok=True) pngs = glob.glob(os.path.join(input_folder, '*.png')) if pngs: ext = 'png' file_list.extend(pngs) jpgs = glob.glob(os.path.join(input_folder, '*.jpg')) if jpgs: ext = 'jpg' file_list.extend(jpgs) file_list.sort() for index, file_name in enumerate(file_list): shutil.copy( file_name, os.path.join(output_folder, (img_format + '.%s') % (index, ext))) return img_format + '.%s' % ext
[docs]def array_to_video(image_array: np.ndarray, output_path: str, fps: Union[int, float] = 30, resolution: Union[Tuple[int, int], Tuple[float, float]] = None, disable_log: bool = False, logger: Union[None, str, logging.Logger] = None) -> None: """Convert an array to a video directly, gif not supported. Args: image_array (np.ndarray): shape should be (f * h * w * 3). output_path (str): output video file path. fps (Union[int, float, optional): fps. Defaults to 30. resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], \ optional): (height, width) of the output video. Defaults to None. disable_log (bool, optional): whether close the ffmepg command info. Defaults to False. Raises: FileNotFoundError: check output path. TypeError: check input array. Returns: None. """ if not isinstance(image_array, np.ndarray): raise TypeError('Input should be np.ndarray.') assert image_array.ndim == 4 assert image_array.shape[-1] == 3 prepare_output_path( output_path, allowed_suffix=['.mp4'], tag='output video', path_type='file', overwrite=True, logger=logger) if resolution: height, width = resolution width += width % 2 height += height % 2 else: image_array = pad_for_libx264(image_array) height, width = image_array.shape[1], image_array.shape[2] command = [ 'ffmpeg', '-y', # (optional) overwrite output file if it exists '-f', 'rawvideo', '-s', f'{int(width)}x{int(height)}', # size of one frame '-pix_fmt', 'bgr24', '-r', f'{fps}', # frames per second '-loglevel', 'error', '-threads', '4', '-i', '-', # The input comes from a pipe '-vcodec', 'libx264', '-an', # Tells FFMPEG not to expect any audio output_path, ] if not disable_log: logger = get_logger(logger) logger.info(f'Running \"{" ".join(command)}\"') process = subprocess.Popen( command, stdin=subprocess.PIPE, stderr=subprocess.PIPE, ) if process.stdin is None or process.stderr is None: raise BrokenPipeError('No buffer received.') index = 0 while True: if index >= image_array.shape[0]: break process.stdin.write(image_array[index].tobytes()) index += 1 process.stdin.close() process.stderr.close() process.wait()
[docs]def array_to_images(image_array: np.ndarray, output_folder: str, img_format: str = '%06d.png', resolution: Union[Tuple[int, int], Tuple[float, float]] = None, disable_log: bool = False, logger: Union[None, str, logging.Logger] = None) -> None: """Convert an array to images directly. Args: image_array (np.ndarray): shape should be (f * h * w * 3). output_folder (str): output folder for the images. img_format (str, optional): format of the images. Defaults to '%06d.png'. resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], \ optional): (height, width) of the output images. Defaults to None. disable_log (bool, optional): whether close the ffmepg command info. Defaults to False. Raises: FileNotFoundError: check output folder. TypeError: check input array. Returns: None """ logger = get_logger(logger) prepare_output_path( output_folder, allowed_suffix=[], tag='output image folder', path_type='dir', overwrite=True) if not isinstance(image_array, np.ndarray): raise TypeError('Input should be np.ndarray.') assert image_array.ndim == 4 assert image_array.shape[-1] == 3 if resolution: height, width = resolution logger.error('Resolution not correctly implemented.') raise NotImplementedError else: height, width = image_array.shape[1], image_array.shape[2] command = [ 'ffmpeg', '-y', # (optional) overwrite output file if it exists '-f', 'rawvideo', '-s', f'{int(width)}x{int(height)}', # size of one frame '-pix_fmt', 'bgr24', # bgr24 for matching OpenCV '-loglevel', 'error', '-threads', '4', '-i', '-', # The input comes from a pipe '-f', 'image2', '-start_number', '0', os.path.join(output_folder, img_format), ] if not disable_log: logger = get_logger(logger) logger.info(f'Running \"{" ".join(command)}\"') process = subprocess.Popen( command, stdin=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=10**8, close_fds=True) if process.stdin is None or process.stderr is None: raise BrokenPipeError('No buffer received.') index = 0 while True: if index >= image_array.shape[0]: break process.stdin.write(image_array[index].tobytes()) index += 1 process.stdin.close() process.stderr.close() process.wait()
def images_to_video(input_folder: str, output_path: str, remove_raw_file: bool = False, img_format: str = '%06d.png', fps: Union[int, float] = 30, resolution: Union[Tuple[int, int], Tuple[float, float]] = None, start: int = 0, end: int = None, disable_log: bool = False, logger: Union[None, str, logging.Logger] = None) -> None: """Convert a folder of images to a video. Args: input_folder (str): input image folder output_path (str): output video file path remove_raw_file (bool, optional): whether remove raw images. Defaults to False. img_format (str, optional): format to name the images]. Defaults to '%06d.png'. fps (Union[int, float], optional): output video fps. Defaults to 30. resolution (Optional[Union[Tuple[int, int], Tuple[float, float]]], optional): (height, width) of output. defaults to None. start (int, optional): start frame index. Inclusive. If < 0, will be converted to frame_index range in [0, frame_num]. Defaults to 0. end (int, optional): end frame index. Exclusive. Could be positive int or negative int or None. If None, all frames from start till the last frame are included. Defaults to None. disable_log (bool, optional): whether close the ffmepg command info. Defaults to False. Raises: FileNotFoundError: check the input path. FileNotFoundError: check the output path. Returns: None """ logger = get_logger(logger) check_path( input_path=input_folder, allowed_existence=[Existence.DirectoryExistNotEmpty], allowed_suffix=[''], path_type='dir', logger=logger) prepare_output_path( output_path, allowed_suffix=['.mp4'], tag='output video', path_type='file', overwrite=True, logger=logger) n_frames = len(os.listdir(input_folder)) start = max(start, 0) % (n_frames + 1) end = min(end, n_frames) % (n_frames + 1) \ if end is not None else n_frames command = [ 'ffmpeg', '-y', '-threads', '4', '-start_number', f'{start}', '-r', f'{fps}', '-i', f'{input_folder}/{img_format}', '-frames:v', f'{end - start}', '-profile:v', 'baseline', '-level', '3.0', '-c:v', 'libx264', '-pix_fmt', 'yuv420p', '-an', '-v', 'error', '-loglevel', 'error', output_path, ] if resolution: height, width = resolution width += width % 2 height += height % 2 command.insert(1, '-s') command.insert(2, '%dx%d' % (width, height)) if not disable_log: print(f'Running \"{" ".join(command)}\"') subprocess.call(command) if remove_raw_file: if Path(input_folder).is_dir(): shutil.rmtree(input_folder)