| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import cv2 |
| import numpy as np |
| import argparse |
| from enum import Enum, auto |
| import time |
|
|
|
|
| class FrameVis: |
| """ |
| Reads a video file and outputs an image comprised of n resized frames, spread evenly throughout the file. |
| """ |
|
|
| default_frame_height = None |
| default_frame_width = None |
| default_concat_size = 1 |
| default_direction = "horizontal" |
|
|
| def visualize(self, source, nframes, height=default_frame_height, width=default_frame_width, \ |
| direction=default_direction, trim=False, quiet=True): |
| """ |
| Reads a video file and outputs an image comprised of n resized frames, spread evenly throughout the file. |
| |
| Parameters: |
| source (str): filepath to source video file |
| nframes (int): number of frames to process from the video |
| height (int): height of each frame, in pixels |
| width (int): width of each frame, in pixels |
| direction (str): direction to concatenate frames ("horizontal" or "vertical") |
| quiet (bool): suppress console messages |
| |
| Returns: |
| visualization image as numpy array |
| """ |
|
|
| video = cv2.VideoCapture(source) |
| if not video.isOpened(): |
| raise FileNotFoundError("Source Video Not Found") |
|
|
| if not quiet: |
| print("") |
| |
| |
| video_total_frames = video.get(cv2.CAP_PROP_FRAME_COUNT) |
| if not isinstance(nframes, int) or nframes < 1: |
| raise ValueError("Number of frames must be a positive integer") |
| elif nframes > video_total_frames: |
| raise ValueError("Requested frame count larger than total available ({})".format(video_total_frames)) |
| keyframe_interval = video_total_frames / nframes |
|
|
| |
| success,image = video.read() |
| if not success: |
| raise IOError("Cannot read from video file") |
|
|
| |
| matte_type = 0 |
| if trim == True: |
| if not quiet: |
| print("Trimming enabled, checking matting... ", end="", flush=True) |
|
|
| |
| success, cropping_bounds = MatteTrimmer.determine_video_bounds(source, 10, 3) |
|
|
| matte_type = 0 |
| if success: |
| crop_width = cropping_bounds[1][0] - cropping_bounds[0][0] + 1 |
| crop_height = cropping_bounds[1][1] - cropping_bounds[0][1] + 1 |
|
|
| if crop_height != image.shape[0]: |
| matte_type += 1 |
| if crop_width != image.shape[1]: |
| matte_type +=2 |
| |
| if not quiet: |
| if matte_type == 0: |
| print("no matting detected") |
| elif matte_type == 1: |
| print("letterboxing detected, cropping {} px from the top and bottom".format(int((image.shape[0] - crop_height) / 2))) |
| elif matte_type == 2: |
| print("pillarboxing detected, trimming {} px from the sides".format(int((image.shape[1] - crop_width) / 2))) |
| elif matte_type == 3: |
| print("multiple matting detected - cropping ({}, {}) to ({}, {})".format(image.shape[1], image.shape[0], crop_width, crop_height)) |
|
|
| |
| if height is None: |
| if direction == "horizontal": |
| if matte_type & 1 == 1: |
| height = crop_height |
| else: |
| height = image.shape[0] |
| else: |
| height = FrameVis.default_concat_size |
| elif not isinstance(height, int) or height < 1: |
| raise ValueError("Frame height must be a positive integer") |
| |
| |
| if width is None: |
| if direction == "vertical": |
| if matte_type & 2 == 2: |
| width = crop_width |
| else: |
| width = image.shape[1] |
| else: |
| width = FrameVis.default_concat_size |
| elif not isinstance(width, int) or width < 1: |
| raise ValueError("Frame width must be a positive integer") |
|
|
| |
| if direction == "horizontal": |
| concatenate = cv2.hconcat |
| output_width = width * nframes |
| output_height = height |
| elif direction == "vertical": |
| concatenate = cv2.vconcat |
| output_width = width |
| output_height = height * nframes |
| else: |
| raise ValueError("Invalid direction specified") |
|
|
| if not quiet: |
| aspect_ratio = output_width / output_height |
| print("Visualizing \"{}\" - {} by {} ({:.2f}), from {} frames (every {:.2f} seconds)"\ |
| .format(source, output_width, output_height, aspect_ratio, nframes, FrameVis.interval_from_nframes(source, nframes))) |
|
|
| |
| next_keyframe = keyframe_interval / 2 |
| finished_frames = 0 |
| output_image = None |
| progress = ProgressBar("Processing:") |
|
|
| while True: |
| if finished_frames == nframes: |
| break |
|
|
| video.set(cv2.CAP_PROP_POS_FRAMES, int(next_keyframe)) |
| success,image = video.read() |
|
|
| if not success: |
| raise IOError("Cannot read from video file (frame {} out of {})".format(int(next_keyframe), video_total_frames)) |
|
|
| if matte_type != 0: |
| image = MatteTrimmer.crop_image(image, cropping_bounds) |
|
|
| image = cv2.resize(image, (width, height)) |
|
|
| |
| if output_image is None: |
| output_image = image |
| else: |
| output_image = concatenate([output_image, image]) |
|
|
| finished_frames += 1 |
| next_keyframe += keyframe_interval |
|
|
| if not quiet: |
| progress.write(finished_frames / nframes) |
|
|
| video.release() |
|
|
| return output_image |
|
|
| @staticmethod |
| def average_image(image, direction): |
| """ |
| Averages the colors in an axis across an entire image |
| |
| Parameters: |
| image (arr x.y.c): image as 3-dimensional numpy array |
| direction (str): direction to average frames ("horizontal" or "vertical") |
| |
| Returns: |
| image, with pixel data averaged along provided axis |
| """ |
|
|
| height, width, depth = image.shape |
|
|
| if direction == "horizontal": |
| scale_height = 1 |
| scale_width = width |
| elif direction == "vertical": |
| scale_height = height |
| scale_width = 1 |
| else: |
| raise ValueError("Invalid direction specified") |
|
|
| image = cv2.resize(image, (scale_width, scale_height)) |
| image = cv2.resize(image, (width, height)) |
|
|
| return image |
|
|
| @staticmethod |
| def motion_blur(image, direction, blur_amount): |
| """ |
| Blurs the pixels in a given axis across an entire image. |
| |
| Parameters: |
| image (arr x.y.c): image as 3-dimensional numpy array |
| direction (str): direction of stacked images for blurring ("horizontal" or "vertical") |
| blur_amount (int): how much to blur the image, as the convolution kernel size |
| |
| Returns: |
| image, with pixel data blurred along provided axis |
| """ |
| |
| kernel = np.zeros((blur_amount, blur_amount)) |
|
|
| |
| if direction == "horizontal": |
| kernel[:, int((blur_amount - 1)/2)] = np.ones(blur_amount) |
| elif direction == "vertical": |
| kernel[int((blur_amount - 1)/2), :] = np.ones(blur_amount) |
| else: |
| raise ValueError("Invalid direction specified") |
|
|
| kernel /= blur_amount |
|
|
| return cv2.filter2D(image, -1, kernel) |
|
|
| @staticmethod |
| def nframes_from_interval(source, interval): |
| """ |
| Calculates the number of frames available in a video file for a given capture interval |
| |
| Parameters: |
| source (str): filepath to source video file |
| interval (float): capture frame every i seconds |
| |
| Returns: |
| number of frames per time interval (int) |
| """ |
| video = cv2.VideoCapture(source) |
| if not video.isOpened(): |
| raise FileNotFoundError("Source Video Not Found") |
|
|
| frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT) |
| fps = video.get(cv2.CAP_PROP_FPS) |
| duration = frame_count / fps |
|
|
| video.release() |
|
|
| return int(round(duration / interval)) |
|
|
| @staticmethod |
| def interval_from_nframes(source, nframes): |
| """ |
| Calculates the capture interval, in seconds, for a video file given the |
| number of frames to capture |
| |
| Parameters: |
| source (str): filepath to source video file |
| nframes (int): number of frames to capture from the video file |
| |
| Returns: |
| time interval (seconds) between frame captures (float) |
| """ |
| video = cv2.VideoCapture(source) |
| if not video.isOpened(): |
| raise FileNotFoundError("Source Video Not Found") |
|
|
| frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT) |
| fps = video.get(cv2.CAP_PROP_FPS) |
| keyframe_interval = frame_count / nframes |
|
|
| video.release() |
|
|
| return keyframe_interval / fps |
|
|
|
|
| class MatteTrimmer: |
| """ |
| Functions for finding and removing black mattes around video frames |
| """ |
|
|
| @staticmethod |
| def find_matrix_edges(matrix, threshold): |
| """ |
| Finds the start and end points of a 1D array above a given threshold |
| |
| Parameters: |
| matrix (arr, 1.x): 1D array of data to check |
| threshold (value): valid data is above this trigger level |
| |
| Returns: |
| tuple with the array indices of data bounds, start and end |
| """ |
|
|
| if not isinstance(matrix, (list, tuple, np.ndarray)) or len(matrix.shape) != 1: |
| raise ValueError("Provided matrix is not the right size (must be 1D)") |
|
|
| data_start = None |
| data_end = None |
|
|
| for value_id, value in enumerate(matrix): |
| if value > threshold: |
| if data_start is None: |
| data_start = value_id |
| data_end = value_id |
|
|
| return (data_start, data_end) |
|
|
| @staticmethod |
| def find_larger_bound(first, second): |
| """ |
| Takes two sets of diagonal rectangular boundary coordinates and determines |
| the set of rectangular boundary coordinates that contains both |
| |
| Parameters: |
| first (arr, 1.2.2): pair of rectangular coordinates, in the form [(X,Y), (X,Y)] |
| second (arr, 1.2.2): pair of rectangular coordinates, in the form [(X,Y), (X,Y)] |
| |
| Where for both arrays the first coordinate is in the top left-hand corner, |
| and the second coordinate is in the bottom right-hand corner. |
| |
| Returns: |
| numpy coordinate matrix containing both of the provided boundaries |
| """ |
| left_edge = first[0][0] if first[0][0] <= second[0][0] else second[0][0] |
| right_edge = first[1][0] if first[1][0] >= second[1][0] else second[1][0] |
|
|
| top_edge = first[0][1] if first[0][1] <= second[0][1] else second[0][1] |
| bottom_edge = first[1][1] if first[1][1] >= second[1][1] else second[1][1] |
|
|
| return np.array([[left_edge, top_edge], [right_edge, bottom_edge]]) |
|
|
| @staticmethod |
| def valid_bounds(bounds): |
| """ |
| Checks if the frame bounds are a valid format |
| |
| Parameters: |
| bounds (arr, 1.2.2): pair of rectangular coordinates, in the form [(X,Y), (X,Y)] |
| |
| Returns: |
| True or False |
| """ |
|
|
| for x, x_coordinate in enumerate(bounds): |
| for y, y_coordinate in enumerate(bounds): |
| if bounds[x][y] is None: |
| return False |
|
|
| if bounds[0][0] > bounds[1][0] or \ |
| bounds[0][1] > bounds[1][1]: |
| return False |
|
|
| return True |
|
|
| @staticmethod |
| def determine_image_bounds(image, threshold): |
| """ |
| Determines if there are any hard mattes (black bars) surrounding |
| an image on either the top (letterboxing) or the sides (pillarboxing) |
| |
| Parameters: |
| image (arr, x.y.c): image as 3-dimensional numpy array |
| threshold (8-bit int): min color channel value to judge as 'image present' |
| |
| Returns: |
| success (bool): True or False if the bounds are valid |
| image_bounds: numpy coordinate matrix with the two opposite corners of the |
| image bounds, in the form [(X,Y), (X,Y)] |
| """ |
|
|
| height, width, depth = image.shape |
|
|
| |
| horizontal_sums = np.sum(image, axis=(1,2)) |
| hthreshold = (threshold * width * depth) |
| vertical_edges = MatteTrimmer.find_matrix_edges(horizontal_sums, hthreshold) |
|
|
| |
| vertical_sums = np.sum(image, axis=(0,2)) |
| vthreshold = (threshold * height * depth) |
| horizontal_edges = MatteTrimmer.find_matrix_edges(vertical_sums, vthreshold) |
|
|
| image_bounds = np.array([[horizontal_edges[0], vertical_edges[0]], [horizontal_edges[1], vertical_edges[1]]]) |
|
|
| return MatteTrimmer.valid_bounds(image_bounds), image_bounds |
|
|
| @staticmethod |
| def determine_video_bounds(source, nsamples, threshold): |
| """ |
| Determines if any matting exists in a video source |
| |
| Parameters: |
| source (str): filepath to source video file |
| nsamples (int): number of frames from the video to determine bounds, |
| evenly spaced throughout the video |
| threshold (8-bit int): min color channel value to judge as 'image present' |
| |
| Returns: |
| success (bool): True or False if the bounds are valid |
| video_bounds: numpy coordinate matrix with the two opposite corners of the |
| video bounds, in the form [(X,Y), (X,Y)] |
| """ |
| video = cv2.VideoCapture(source) |
| if not video.isOpened(): |
| raise FileNotFoundError("Source Video Not Found") |
|
|
| video_total_frames = video.get(cv2.CAP_PROP_FRAME_COUNT) |
| if not isinstance(nsamples, int) or nsamples < 1: |
| raise ValueError("Number of samples must be a positive integer") |
| keyframe_interval = video_total_frames / nsamples |
|
|
| |
| |
| success,image = video.read() |
| if not success: |
| raise IOError("Cannot read from video file") |
|
|
| next_keyframe = keyframe_interval / 2 |
| video_bounds = None |
|
|
| for frame_number in range(nsamples): |
| video.set(cv2.CAP_PROP_POS_FRAMES, int(next_keyframe)) |
| success,image = video.read() |
|
|
| if not success: |
| raise IOError("Cannot read from video file") |
| |
| success, frame_bounds = MatteTrimmer.determine_image_bounds(image, threshold) |
|
|
| if not success: |
| continue |
|
|
| video_bounds = frame_bounds if video_bounds is None else MatteTrimmer.find_larger_bound(video_bounds, frame_bounds) |
| next_keyframe += keyframe_interval |
|
|
| video.release() |
|
|
| return MatteTrimmer.valid_bounds(video_bounds), video_bounds |
|
|
| @staticmethod |
| def crop_image(image, bounds): |
| """ |
| Crops a provided image by the coordinate bounds pair provided. |
| |
| Parameters: |
| image (arr, x.y.c): image as 3-dimensional numpy array |
| second (arr, 1.2.2): pair of rectangular coordinates, in the form [(X,Y), (X,Y)] |
| |
| Returns: |
| image as 3-dimensional numpy array, cropped to the coordinate bounds |
| """ |
| return image[bounds[0][1]:bounds[1][1], bounds[0][0]:bounds[1][0]] |
|
|
| class ProgressBar: |
| """ |
| Generates a progress bar for the console output |
| |
| Args: |
| pre (str): string to prepend before the progress bar |
| bar_length (int): length of the progress bar itself, in characters |
| print_elapsed (bool): option to print time elapsed or not |
| |
| Attributes: |
| pre (str): string to prepend before the progress bar |
| bar_length (int): length of the progress bar itself, in characters |
| print_time (bool): option to print time elapsed or not |
| print_elapsed (int): starting time for the progress bar, in unix seconds |
| |
| """ |
|
|
| def __init__(self, pre="", bar_length=25, print_elapsed=True): |
| pre = (pre + '\t') if pre != "" else pre |
| self.pre = pre |
| self.bar_length = bar_length |
| self.print_elapsed = print_elapsed |
| if self.print_elapsed: |
| self.__start_time = time.time() |
|
|
| def write(self, percent): |
| """Prints a progress bar to the console based on the input percentage (float).""" |
| term_char = '\r' if percent < 1.0 else '\n' |
|
|
| filled_size = int(round(self.bar_length * percent)) |
| progress_bar = "#" * filled_size + " " * (self.bar_length - filled_size) |
|
|
| time_string = "" |
| if self.print_elapsed: |
| time_elapsed = time.time() - self.__start_time |
| time_string = "\tTime Elapsed: {}".format(time.strftime("%H:%M:%S", time.gmtime(time_elapsed))) |
|
|
| print("{}[{}]\t{:.2%}{}".format(self.pre, progress_bar, percent, time_string), end=term_char, flush=True) |
| |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="video frame visualizer and movie barcode generator", add_help=False) |
|
|
| parser.add_argument("source", help="file path for the video file to be visualized", type=str) |
| parser.add_argument("destination", help="file path output for the final image", type=str) |
| parser.add_argument("-n", "--nframes", help="the number of frames in the visualization", type=int) |
| parser.add_argument("-i", "--interval", help="interval between frames for the visualization", type=float) |
| parser.add_argument("-h", "--height", help="the height of each frame, in pixels", type=int, default=FrameVis.default_frame_height) |
| parser.add_argument("-w", "--width", help="the output width of each frame, in pixels", type=int, default=FrameVis.default_frame_width) |
| parser.add_argument("-d", "--direction", help="direction to concatenate frames, horizontal or vertical", type=str, \ |
| choices=["horizontal", "vertical"], default=FrameVis.default_direction) |
| parser.add_argument("-t", "--trim", help="detect and trim any hard matting (letterboxing or pillarboxing)", action='store_true', default=False) |
| parser.add_argument("-a", "--average", help="average colors for each frame", action='store_true', default=False) |
| parser.add_argument("-b", "--blur", help="apply motion blur to the frames (kernel size)", type=int, nargs='?', const=100, default=0) |
| parser.add_argument("-q", "--quiet", help="mute console outputs", action='store_true', default=False) |
| parser.add_argument("--help", action="help", help="show this help message and exit") |
|
|
| args = parser.parse_args() |
|
|
| |
| if args.nframes is None: |
| if args.interval is not None: |
| args.nframes = FrameVis.nframes_from_interval(args.source, args.interval) |
| else: |
| parser.error("You must provide either an --(n)frames or --(i)nterval argument") |
|
|
| |
| if args.average is True and args.blur != 0: |
| parser.error("Cannot (a)verage and (b)lur, you must choose one or the other") |
|
|
| fv = FrameVis() |
|
|
| output_image = fv.visualize(args.source, args.nframes, height=args.height, width=args.width, \ |
| direction=args.direction, trim=args.trim, quiet=args.quiet) |
|
|
| |
| if args.average or args.blur != 0: |
| if args.average: |
| if not args.quiet: |
| print("Averaging frame colors... ", end="", flush=True) |
| output_image = fv.average_image(output_image, args.direction) |
| |
| if args.blur != 0: |
| if not args.quiet: |
| print("Adding motion blur to final frame... ", end="", flush=True) |
| output_image = fv.motion_blur(output_image, args.direction, args.blur) |
|
|
| if not args.quiet: |
| print("done") |
| |
| cv2.imwrite(args.destination, output_image) |
|
|
| if not args.quiet: |
| print("Visualization saved to {}".format(args.destination)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|