import threading import janus import typing import numpy as np from av import VideoFrame from aiortc import MediaStreamTrack class OpenCV_Video_Source_Async(): """ Class to handle video source from OpenCV in an async way Provide a frame_func that returns a frame synchronously, this component will run a thread to get the frame and put it in a queue, such that the main thread can get the frame asynchronously. (This is a workaround because pythons sync/async is a mess) """ def __init__(self, frame_func: typing.Callable[[], np.array]): self.frame_func: typing.Callable[[], np.array] = frame_func self.thread = threading.Thread(target=self.run, args=()) self.thread.daemon = True self.running = False self.frame_queue: janus.Queue[np.array] = janus.Queue(maxsize=256) self.frame_queue_sync = self.frame_queue.sync_q self.frame_queue_async = self.frame_queue.async_q def start(self): self.running = True self.thread.start() def stop(self): self.running = False self.thread.join() def run(self): while self.running: frame = self.frame_func() if not self.frame_queue_sync.full(): self.frame_queue_sync.put(frame) async def get_frame(self) -> np.array: return await self.frame_queue_async.get() class StreamTrackOpencv(MediaStreamTrack): """ A video stream track from a opencv video source """ kind = "video" def __init__(self, video_source: OpenCV_Video_Source_Async): super().__init__() self.video_source = video_source async def recv(self): frame = await self.video_source.get_frame() return VideoFrame.from_ndarray(frame)