From ff23d2e747137b1bb40a688c6986fc400965a4d8 Mon Sep 17 00:00:00 2001 From: fuatakgun Date: Wed, 29 Nov 2023 22:58:57 +0100 Subject: [PATCH] feat: better P2P with audio --- .../eufy_security/eufy_security_api/camera.py | 2 +- .../eufy_security_api/p2p_streamer.py | 20 +++++++++---------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/custom_components/eufy_security/eufy_security_api/camera.py b/custom_components/eufy_security/eufy_security_api/camera.py index 11c250f..e700ad5 100644 --- a/custom_components/eufy_security/eufy_security_api/camera.py +++ b/custom_components/eufy_security/eufy_security_api/camera.py @@ -149,9 +149,9 @@ async def check_live_stream(self): async def start_livestream(self) -> bool: """Process start p2p livestream call""" - self.stream_future = asyncio.create_task(self.p2p_streamer.start()) if await self._initiate_start_stream(StreamProvider.P2P) is False: return False + self.stream_future = asyncio.create_task(self.p2p_streamer.start()) self.stream_checker = asyncio.create_task(self.check_live_stream()) self.stream_status = StreamStatus.STREAMING return True diff --git a/custom_components/eufy_security/eufy_security_api/p2p_streamer.py b/custom_components/eufy_security/eufy_security_api/p2p_streamer.py index 8604a0f..43c2fba 100644 --- a/custom_components/eufy_security/eufy_security_api/p2p_streamer.py +++ b/custom_components/eufy_security/eufy_security_api/p2p_streamer.py @@ -24,19 +24,13 @@ def __init__(self, camera) -> None: async def chunk_generator(self, queue, queue_name): retry_count = 0 max_retry_count = 10 - try: - await asyncio.wait_for(self.camera.p2p_started_event.wait(), 5) - except asyncio.TimeoutError as te: - _LOGGER.debug(f"chunk_generator {queue_name} - event did not receive in timeout") - raise te - while retry_count < max_retry_count: try: item = queue.popleft() - _LOGGER.debug(f"chunk_generator {queue_name} yield data {retry_count} - {len(item)}") + #_LOGGER.debug(f"chunk_generator {queue_name} yield data {retry_count} - {len(item)}") retry_count = 0 yield item - except IndexError as qe: + except IndexError: retry_count = retry_count + 1 await asyncio.sleep(0.1) @@ -64,7 +58,7 @@ async def write_bytes(self, queue, queue_name): _LOGGER.debug(f"write_bytes {queue_name} general exception no retry {ex} - traceback: {traceback.format_exc()}") self.retry = False - _LOGGER.debug("write_bytes {queue_name} - ended") + _LOGGER.debug(f"write_bytes {queue_name} - ended") async def create_stream_on_go2rtc(self): parameters = {"name": str(self.camera.serial_no)} @@ -83,11 +77,15 @@ async def create_stream_on_go2rtc(self): result = response.status, await response.text() _LOGGER.debug(f"create_stream_on_go2rtc - put stream response {result}") + def run(self, queue, name): + asyncio.run(self.write_bytes(queue, name)) + async def start(self): """start streaming thread""" # send API command to go2rtc to create a new stream + self.retry = None await self.create_stream_on_go2rtc() await asyncio.gather( - self.write_bytes(self.camera.audio_queue, "audio"), - self.write_bytes(self.camera.video_queue, "video") + asyncio.to_thread(self.run, self.camera.audio_queue, "audio"), + asyncio.to_thread(self.run, self.camera.video_queue, "video") )