diff --git a/custom_components/eufy_security/eufy_security_api/camera.py b/custom_components/eufy_security/eufy_security_api/camera.py index fe4373e..c11ccc5 100644 --- a/custom_components/eufy_security/eufy_security_api/camera.py +++ b/custom_components/eufy_security/eufy_security_api/camera.py @@ -140,7 +140,7 @@ async def start_livestream(self) -> bool: self.stream_status = StreamStatus.STREAMING return True - async def check_and_stop_livestream(self, retry: bool): + async def check_and_stop_livestream(self, retry): _LOGGER.debug(f"check_and_stop_livestream - start - {retry}") if self.stream_status != StreamStatus.IDLE: await self.stop_livestream() diff --git a/custom_components/eufy_security/eufy_security_api/p2p_streamer.py b/custom_components/eufy_security/eufy_security_api/p2p_streamer.py index dc56f6b..5fa1baa 100644 --- a/custom_components/eufy_security/eufy_security_api/p2p_streamer.py +++ b/custom_components/eufy_security/eufy_security_api/p2p_streamer.py @@ -19,6 +19,8 @@ class P2PStreamer: def __init__(self, camera) -> None: self.camera = camera + self.completed_future = None + async def chunk_generator(self, queue): retry = 0 @@ -36,7 +38,7 @@ async def chunk_generator(self, queue): retry = retry + 1 await asyncio.sleep(0.1) - async def write_bytes(self, queue): + async def write_bytes(self, queue, completed_future): url = GO2RTC_API_URL.format(self.camera.config.rtsp_server_address, GO2RTC_API_PORT) url = f"{url}?dst={str(self.camera.serial_no)}" @@ -60,7 +62,7 @@ async def write_bytes(self, queue): _LOGGER.debug(f"write_bytes general exception %s - traceback: %s", ex, traceback.format_exc()) _LOGGER.debug("write_bytes - ended") - await self.stop(retry) + completed_future.get_loop().call_soon_threadsafe(completed_future.set_result, retry) async def create_stream_on_go2rtc(self): parameters = {"name": str(self.camera.serial_no), "src": str(self.camera.serial_no)} @@ -74,18 +76,17 @@ async def create_stream_on_go2rtc(self): def p2p_worker(self): new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) - new_loop.run_until_complete(self.write_bytes(self.camera.video_queue)) - return + new_loop.run_until_complete(self.write_bytes(self.camera.video_queue, self.completed_future)) async def start(self): """start streaming thread""" # send API command to go2rtc to create a new stream await self.create_stream_on_go2rtc() + self.completed_future = asyncio.get_running_loop().create_future() p2p_thread = threading.Thread(target=self.p2p_worker, daemon=True) p2p_thread.start() - #asyncio.new_event_loop().create_task(self.write_bytes(self.camera.audio_queue)) - - async def stop(self, retry): - _LOGGER.debug(f"p2p - initiate stop - {retry}") - await self.camera.check_and_stop_livestream(retry) + _LOGGER.debug(f"start - {self.completed_future}") + await asyncio.wait_for(self.completed_future, timeout=None) + retry = self.completed_future.result() + await self.camera.check_and_stop_livestream(retry) \ No newline at end of file