Skip to content

Commit

Permalink
feat: improve retry for streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
fuatakgun committed Nov 23, 2023
1 parent 27e0ebe commit 47c132b
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
5 changes: 4 additions & 1 deletion custom_components/eufy_security/eufy_security_api/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ async def _handle_rtsp_livestream_stopped(self, event: Event):
self.stream_status = StreamStatus.IDLE

async def _handle_livestream_video_data_received(self, event: Event):
# automatically find this function for respective event
await self.video_queue.put(event.data["buffer"]["data"])

async def _handle_livestream_audio_data_received(self, event: Event):
pass
#await self.video_queue.put(event.data["buffer"]["data"])

async def _initiate_start_stream(self, stream_type) -> bool:
self.set_stream_prodiver(stream_type)
self.stream_status = StreamStatus.PREPARING
Expand Down
1 change: 1 addition & 0 deletions custom_components/eufy_security/eufy_security_api/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
STREAM_SLEEP_SECONDS = 0.25
GO2RTC_RTSP_PORT = 8554
GO2RTC_API_PORT = 1984
GO2RTC_API_URL = "http://{0}:{1}/api/stream"


class MessageField(Enum):
Expand Down
32 changes: 24 additions & 8 deletions custom_components/eufy_security/eufy_security_api/p2p_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import traceback
import aiohttp
import os
from .const import GO2RTC_API_PORT
from .const import GO2RTC_API_PORT, GO2RTC_API_URL

_LOGGER: logging.Logger = logging.getLogger(__package__)

Expand All @@ -27,27 +27,37 @@ async def chunk_generator(self):
yield bytearray(item)
except TimeoutError as te:
_LOGGER.debug(f"chunk_generator timeout Exception %s - traceback: %s", te, traceback.format_exc())
break
raise te

async def write_bytes(self):
url = f"http://{self.camera.config.rtsp_server_address}:{GO2RTC_API_PORT}/api/stream?dst={str(self.camera.serial_no)}"
headers = {'Content-Type': 'application/octet-stream'}
url = GO2RTC_API_URL.format(self.camera.config.rtsp_server_address, GO2RTC_API_PORT)
url = f"{url}?dst={str(self.camera.serial_no)}"

retry = False
try:
async with aiohttp.ClientSession() as session:
resp = await session.post(url, data = self.chunk_generator(), headers=headers, timeout=aiohttp.ClientTimeout(total=None, connect=5))
resp = await session.post(url, data = self.chunk_generator(), timeout=aiohttp.ClientTimeout(total=None, connect=5))
_LOGGER.debug(f"write_bytes - post response - {resp.status} - {await resp.text()}")

except (asyncio.exceptions.TimeoutError, asyncio.exceptions.CancelledError) as ex:
# live stream probabaly stopped, handle peacefully
_LOGGER.debug(f"write_bytes timeout/cancelled exception %s - traceback: %s", ex, traceback.format_exc())
except aiohttp.client_exceptions.ServerDisconnectedError as ex:
# connection to go2rtc server is broken, try again
_LOGGER.debug(f"write_bytes server_disconnected exception %s - traceback: %s", ex, traceback.format_exc())
retry = True
except Exception as ex: # pylint: disable=broad-except
_LOGGER.debug(f"write_bytes exception %s - traceback: %s", ex, traceback.format_exc())
# other exceptions, log the error
_LOGGER.debug(f"write_bytes general exception %s - traceback: %s", ex, traceback.format_exc())

_LOGGER.debug("write_bytes - ended")

# await self.stop()
await self.stop(retry)

async def create_stream_on_go2rtc(self):
parameters = {"name": str(self.camera.serial_no), "src": str(self.camera.serial_no)}
url = f"http://{self.camera.config.rtsp_server_address}:{GO2RTC_API_PORT}/api/streams"
url = GO2RTC_API_URL.format(self.camera.config.rtsp_server_address, GO2RTC_API_PORT)
url = f"{url}s"
async with aiohttp.ClientSession() as session:
async with session.put(url, params=parameters) as response:
result = response.status, await response.text()
Expand All @@ -61,3 +71,9 @@ async def start(self):

async def stop(self):
await self.camera.check_and_stop_livestream()

async def stop(self, retry: boolean):
await self.camera.check_and_stop_livestream()
await asyncio.sleep(5)
if retry is True:
await self.camera.start_livestream()

0 comments on commit 47c132b

Please sign in to comment.