"""Video readers for Stone Soup.
This is a collection of video readers for Stone Soup, allowing quick reading
of video data/streams.
"""
import datetime
import threading
from collections.abc import Mapping, Sequence
from queue import Queue
from typing import Any
from urllib.parse import ParseResult
import numpy as np
try:
import ffmpeg
import moviepy.editor as mpy
except ImportError as error:
raise ImportError(
"Usage of video processing classes requires that the optional"
"package dependencies 'moviepy' and 'ffmpeg-python' are installed. "
"This can be achieved by running "
"'python -m pip install stonesoup[video]'")\
from error
from .base import FrameReader
from .file import FileReader
from .url import UrlReader
from ..base import Property
from ..buffered_generator import BufferedGenerator
from ..types.sensordata import ImageFrame
[docs]
class VideoClipReader(FileReader, FrameReader):
"""VideoClipReader
A simple reader that uses MoviePy_ to read video frames from a file.
Usage of MoviePy allows for the application of clip transformations
and effects, as per the MoviePy documentation_. Upon instantiation,
the underlying MoviePy `VideoFileClip` instance can be accessed
through the :attr:`~clip` class property. This can then be used
as expected, e.g.:
.. code-block:: python
# Rearrange RGB to BGR
def arrange_bgr(image):
return image[:, :, [2, 1, 0]]
reader = VideoClipReader("path_to_file")
reader.clip = reader.clip.fl_image(arrange_bgr)
for timestamp, frame in reader:
# The generated frame.pixels will now
# be arranged in BGR format.
...
.. _MoviePy: https://zulko.github.io/moviepy/index.html
.. _documentation: https://zulko.github.io/moviepy/getting_started/effects.html
""" # noqa:E501
start_time: datetime.timedelta = Property(
doc="Start time expressed as duration from the start of the clip",
default=datetime.timedelta(seconds=0))
end_time: datetime.timedelta = Property(
doc="End time expressed as duration from the start of the clip",
default=None)
timestamp: datetime.datetime = Property(
doc="Timestamp given to the first frame",
default=None)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
end_time_sec = self.end_time.total_seconds() if self.end_time is not None else None
self.clip = mpy.VideoFileClip(str(self.path)) \
.subclip(self.start_time.total_seconds(), end_time_sec)
[docs]
@BufferedGenerator.generator_method
def frames_gen(self):
if self.timestamp is None:
self.timestamp = datetime.datetime.now()
start_time = self.timestamp
for timestamp_sec, pixels in self.clip.iter_frames(with_times=True):
timestamp = start_time + datetime.timedelta(seconds=timestamp_sec)
frame = ImageFrame(pixels, timestamp)
yield timestamp, frame
[docs]
class FFmpegVideoStreamReader(UrlReader, FrameReader):
""" FFmpegVideoStreamReader
A threaded reader that uses ffmpeg-python_ to read frames from video
streams (e.g. RTSP) in real-time.
Notes
-----
- Use of this class requires that FFmpeg_ is installed on the host machine.
- By default, FFmpeg performs internal buffering of frames leading to a \
slight delay in the incoming frames (0.5-1 sec). To remove the delay it \
is recommended to set ``input_opts={'threads': 1, 'fflags': 'nobuffer'}`` \
when instantiating a reader, e.g: .
.. code-block:: python
video_reader = FFmpegVideoStreamReader('rtsp://192.168.0.10:554/1/h264minor',
input_opts={'threads': 1, 'fflags': 'nobuffer'})
for timestamp, frame in video_reader:
....
.. _ffmpeg-python: https://github.com/kkroening/ffmpeg-python
.. _FFmpeg: https://www.ffmpeg.org/download.html
"""
url: ParseResult = Property(
doc="Input source to read video stream from, passed as input url argument. This can "
"include any valid FFmpeg input e.g. rtsp URL, device name when using 'dshow'/'v4l2'")
buffer_size: int = Property(
default=1,
doc="Size of the frame buffer. The frame buffer is used to cache frames in cases where "
"the stream generates frames faster than they are ingested by the reader. If "
"`buffer_size` is less than or equal to zero, the buffer size is infinite.")
input_opts: Mapping[str, str] = Property(
default=None,
doc="FFmpeg input options, provided in the form of a dictionary, whose keys correspond to "
"option names. (e.g. ``{'fflags': 'nobuffer'}``). The default is ``{}``.")
output_opts: Mapping[str, str] = Property(
default=None,
doc="FFmpeg output options, provided in the form of a dictionary, whose keys correspond "
"to option names. The default is ``{'f': 'rawvideo', 'pix_fmt': 'rgb24'}``.")
filters: Sequence[tuple[str, Sequence[Any], Mapping[Any, Any]]] = Property(
default=None,
doc="FFmpeg filters, provided in the form of a list of filter name, sequence of "
"arguments, mapping of key/value pairs (e.g. ``[('scale', ('320', '240'), {})]``). "
"Default `None` where no filter will be applied. Note that :attr:`frame_size` may "
"need to be set in when video size changed by filter.")
frame_size: tuple[int, int] = Property(
default=None,
doc="Tuple of frame width and height. Default `None` where it will be detected using "
"`ffprobe` against the input, but this may yield wrong width/height (e.g. when "
"filters are applied), and such this option can be used to override.")
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.input_opts is None:
self.input_opts = {}
if self.output_opts is None:
self.output_opts = {'f': 'rawvideo', 'pix_fmt': 'rgb24'}
if self.filters is None:
self.filters = []
self.buffer = Queue(maxsize=self.buffer_size)
if self.frame_size is not None:
self._stream_info = {
'width': self.frame_size[0],
'height': self.frame_size[1]}
else:
# Probe stream information
self._stream_info = next(
s
for s in ffmpeg.probe(self.url.geturl(), **self.input_opts)['streams']
if s['codec_type'] == 'video')
# Initialise stream
self.stream = ffmpeg.input(self.url.geturl(), **self.input_opts)
for filter_ in self.filters:
filter_name, filter_args, filter_kwargs = filter_
self.stream = self.stream.filter(
filter_name, *filter_args, **filter_kwargs
)
self.stream = (
self.stream
.output('pipe:', **self.output_opts)
.global_args('-y', '-loglevel', 'panic')
.run_async(pipe_stdout=True)
)
# Initialise capture thread
self._capture_thread = threading.Thread(target=self._run)
self._capture_thread.daemon = True
self._capture_thread.start()
[docs]
@BufferedGenerator.generator_method
def frames_gen(self):
while self._capture_thread.is_alive():
# if not self.buffer.empty():
frame = self.buffer.get()
timestamp = frame.timestamp
yield timestamp, frame
def _run(self):
while self.stream.poll() is None:
width = int(self._stream_info['width'])
height = int(self._stream_info['height'])
# Read bytes from stream
in_bytes = self.stream.stdout.read(width * height * 3)
if in_bytes:
# Transform bytes to pixels
frame_np = (
np.frombuffer(in_bytes, np.uint8)
.reshape([height, width, 3])
)
frame = ImageFrame(frame_np, datetime.datetime.now())
# Write new frame to buffer
self.buffer.put(frame)