import pytest import cv2 import numpy as np from src.api.streaming.video_source import OpenCV_Video_Source_Async, StreamTrackOpencv pytest_plugins = ('pytest_asyncio',) @pytest.mark.timeout(500) @pytest.mark.asyncio async def test_async_video_source(): v_src = cv2.VideoCapture("test/video/Big_Buck_Bunny_1080_10s_10MB.mp4") def opencv_frame_func(): return cv2.VideoCapture.read(v_src)[1] video_source = OpenCV_Video_Source_Async(opencv_frame_func) video_source.start() for i in range(120): frame = await video_source.get_frame() assert frame is not None assert type(frame) is np.ndarray assert frame.shape == (1080, 1920, 3) video_source.stop()