Source code for snub.io.video

import numpy as np
import imageio
import scipy
import tqdm
import os
from vidio.read import VideoReader


[docs]def azure_ir_transform(input_image): """ Apply the following transformation to better visualize high-dynamic-range infrared video (used in :py:func:`snub.io.video.transform_azure_ir_stream`) This method is useful for viewing the infrared stream output by an `Azure Kinect (K4A) depth sensor <https://github.com/microsoft/Azure-Kinect-Sensor-SDK>`_. :: transformed_image = log(input_image)*322 - 350 Parameters ---------- input_image: ndarray Input image as a numpy array (can be any shape) Returns ------- transformed_image: ndarray """ transformed_image = np.log(np.float32(input_image) + 100) * 70 - 350 transformed_image = np.uint8(np.clip(transformed_image, 0, 255)) return transformed_image
[docs]def transform_azure_ir_stream(inpath, outpath=None, num_frames=None, quality=7): """ Convert a 16bit monochrome video to an 8bit mp4 video that can be viewed within SNUB. Each frame is transformed using :py:func:`snub.io.video.azure_ir_transform` and the output video is compressed using ffmpeg. This method is useful for viewing the infrared stream output by an `Azure Kinect (K4A) depth sensor <https://github.com/microsoft/Azure-Kinect-Sensor-SDK>`_. Parameters ---------- inpath : str Path to the input video outpath: str, default=None Path where the output video will be written (must end in '.mp4'). If ``outpath=None``, then the output video will have the same location as ``inpath`` with the file extension switched to ``.mp4``. num_frames: int, default=None Number of frames to convert. By default the full video is converted. quality: int, default=7 Quality of output video (passed to imageio writer). """ if not os.path.exists(inpath): raise AssertionError("The video {} does not exist".format(inpath)) if outpath is None: outpath = os.path.splitext(inpath)[0] + ".mp4" if outpath == inpath: raise AssertionError( "Cannot overwrite the input video. Make sure the input video does not end in .mp4 or specify an alternative `outpath`" ) elif not os.path.splitext(outpath)[1] == ".mp4": raise AssertionError("`outpath` must end with .mp4") reader = imageio.get_reader(inpath, pixelformat="gray16", dtype="uint16") num_frames_in_video = reader.count_frames() fps = reader.get_meta_data()["fps"] if num_frames is None: num_frames = num_frames_in_video elif num_frames > num_frames_in_video: raise AssertionError( "`num_frames={} but there are only {} frames in the input video".format( num_frames, num_frames_in_video ) ) print("Saving transformed video to " + outpath) with imageio.get_writer( outpath, fps=fps, quality=quality, pixelformat="yuv420p" ) as writer: for i in tqdm.trange(num_frames): img = reader.get_data(i) img = azure_ir_transform(img) writer.append_data(img)
[docs]def detrend_video( videopath_in, videopath_out, window_length=150, window_step=10, pctl=20, clipping_bounds=(-20, 45), quality=6, ): """ Detrend a video by subtracting a pixel-wise running percentile. Parameters ---------- videopath_in : str Path to the input video videopath_out : str Path to write the detrended video window_length: int, default=75 Window over which to calculate the running percentile. window_step: int, default=5 Downsampling factor for computing running percentile. For frame `i`, the frames used to compute the percetile will be `[i, i-window_step, i-2*window_step,...,i-window_length]` pctl: int, default=20 Percentile used to calculate background clipping_bounds: tuple(float,float), default=(-20,45) Clipping bounds for normalizing detrended video. The interval defined by `clip` is rescaled to [0,255] in the final video. quality: int, defaut=6 Quality of output video (passed to imageio writer). """ reader = imageio.get_reader(videopath_in) metadata = reader.get_meta_data() buffer = [ np.zeros(metadata["size"][::-1]) for i in range(0, window_length, window_step) ] writer = imageio.get_writer( videopath_out, fps=metadata["fps"], quality=quality, macro_block_size=1, pixelformat=metadata["pix_fmt"], ) for im in tqdm.tqdm(reader, total=reader.count_frames()): x = im[:, :, 0].astype(float) buffer.insert(0, x) buffer = buffer[:window_length] background = np.percentile(buffer[::window_step], pctl, axis=0) x = np.clip(x - background, *clipping_bounds) x = (x - clipping_bounds[0]) / (clipping_bounds[1] - clipping_bounds[0]) * 255 writer.append_data(np.repeat(x[:, :, None], 3, axis=2).astype(np.uint8)) writer.close()
[docs]def fast_prct_filt(input_data, level=8, frames_window=3000): """ Fast approximate percentage filtering Borrowed from CaImAn """ from scipy.ndimage import zoom data = np.atleast_2d(input_data).copy() T = np.shape(data)[-1] downsampfact = frames_window elm_missing = int(np.ceil(T * 1.0 / downsampfact) * downsampfact - T) padbefore = int(np.floor(elm_missing / 2.0)) padafter = int(np.ceil(elm_missing / 2.0)) tr_tmp = np.pad(data.T, ((padbefore, padafter), (0, 0)), mode="reflect") numFramesNew, num_traces = np.shape(tr_tmp) # % compute baseline quickly tr_BL = np.reshape( tr_tmp, (downsampfact, int(numFramesNew / downsampfact), num_traces), order="F" ) tr_BL = np.percentile(tr_BL, level, axis=0) tr_BL = zoom( np.array(tr_BL, dtype=np.float32), [downsampfact, 1], order=3, mode="nearest", cval=0.0, prefilter=True, ) if padafter == 0: data -= tr_BL.T else: data -= tr_BL[padbefore:-padafter].T return data.squeeze()
[docs]def generate_video_timestamps(videopath, fps=None, start_time=0): """ Generate timestamps from a video file. """ reader = VideoReader(videopath) if fps is None: fps = reader.fps timestamps = np.arange(len(reader)) / fps + start_time return timestamps