import pytest import time import cv2 from src.api.streaming.encoder import StreamEncoder WIDTH = 1920 HEIGHT = 1080 VIDEO_FILE = "test/video/Big_Buck_Bunny_1080_10s_10MB.mp4" ENCODED_FILE = "test/video/Big_Buck_Bunny_1080_10s_10MB.h264" @pytest.mark.timeout(5) def test_start_stop_encoder(): encoder = StreamEncoder(WIDTH, HEIGHT, lambda x: None) encoder.start_encoder() time.sleep(1) encoder.stop_encoder() time.sleep(1) assert True data = b"" @pytest.mark.timeout(10) def test_process_video(): global data data = b"" vid = cv2.VideoCapture(VIDEO_FILE) def add_frame_to_buffer(d: bytes): global data data += d encoder = StreamEncoder(WIDTH, HEIGHT, add_frame_to_buffer) encoder.start_encoder() while True: ret, frame = vid.read() if not ret: break encoder.commit_frame(frame) encoder.stop_encoder() with open(ENCODED_FILE, "rb") as f: ref_data = f.read() with open("/tmp/test_output.h264", "wb") as f: f.write(data) assert len(data) == len(ref_data) assert data == ref_data