diff --git a/examples/frigate/.env b/examples/frigate/.env new file mode 100644 index 00000000..c82d627a --- /dev/null +++ b/examples/frigate/.env @@ -0,0 +1,9 @@ +# Software component versions. +MOSQUITTO_VERSION=2.0.15 +NTFY_VERSION=latest + +# Broker configuration (Mosquitto). +PORT_MOSQUITTO=1883 + +# Notification service configuration (ntfy). +PORT_NTFY=5555 diff --git a/examples/frigate/README.rst b/examples/frigate/README.rst index cb36a3b9..d5101e3d 100644 --- a/examples/frigate/README.rst +++ b/examples/frigate/README.rst @@ -17,7 +17,16 @@ The specific scenario is to setup a notification pipeline which looks like:: realtime local object detection for IP cameras. It uses MQTT to publish `events in JSON format`_ and `camera pictures in JPEG format`_. -`ntfy`_ (`ntfy on GitHub`_) is a simple HTTP-based pub-sub notification +`Eclipse Mosquitto`_ (`Mosquitto on GitHub`_) is an open source message broker +that implements the MQTT protocol versions 5.0, 3.1.1 and 3.1. Mosquitto is +lightweight and is suitable for use on all devices from low power single board +computers to full servers. + +`mqttwarn`_ (`mqttwarn on GitHub`_) is a highly configurable MQTT message router, +where the routing targets are notification plugins, written in Python. mqttwarn +has a corresponding notification plugin adapter for ntfy. + +`ntfy`_ (`ntfy on GitHub`_) is a simple HTTP-based `pub-sub`_ notification service, allowing you to send notifications to your phone or desktop from any computer, entirely without signup, cost or setup. @@ -26,16 +35,14 @@ any computer, entirely without signup, cost or setup. Synopsis ******** -1. Subscribe to ntfy topic by visiting http://localhost:5555/frigate-test. - -2. Publish Frigate sample events. +1. Publish Frigate sample events. .. code-block:: bash cat assets/frigate-event-new-good.json | jq -c | mosquitto_pub -t 'frigate/events' -l mosquitto_pub -f goat.png -t 'frigate/cam-testdrive/goat/snapshot' -3. Enjoy the outcome. +2. Enjoy the outcome. .. figure:: https://user-images.githubusercontent.com/453543/233172276-6a59cefa-6461-48bc-80f2-c355b6acc496.png @@ -54,27 +61,26 @@ your needs before running mqttwarn on it. If you also want to inspect the corresponding user-defined functions, you are most welcome. They are stored within `frigate.py`_. +Prerequisites +============= + +Acquire sources and go to the right directory:: + + git clone /~https://github.com/jpmens/mqttwarn + cd mqttwarn/examples/frigate + In a box ======== -Start the Mosquitto MQTT broker:: - - docker run --name=mosquitto --rm -it --publish=1883:1883 \ - eclipse-mosquitto:2.0.15 mosquitto -c /mosquitto-no-auth.conf +Start the Mosquitto MQTT broker and the ntfy service:: -Start the ntfy API service:: + docker compose up - docker run --name=ntfy --rm -it --publish=5555:80 binwiederhier/ntfy \ - serve \ - --base-url="http://localhost:5555" \ - --cache-file="/tmp/ntfy-cache.db" \ - --attachment-cache-dir="/tmp/ntfy-attachments" \ - --attachment-expiry-duration="168h" +Subscribe to ntfy topic by visiting http://localhost:5555/frigate-testdrive. Run mqttwarn:: - cd examples/frigate/ MQTTWARNINI=frigate.ini mqttwarn Run the example publisher program:: @@ -97,13 +103,30 @@ Publish an example image:: open /tmp/mqttwarn-frigate-cam-testdrive-goat.png -*********** -Development -*********** +******* +Details +******* -We are investigating how to `Receiving and processing MQTT messages from Frigate NVR`_, -and if it is feasible to make mqttwarn process JPEG content, see `Non-UTF-8 -encoding causes error`_. +The implementation is based on mqttwarn core, its `ntfy service plugin`_, the +mqttwarn configuration file ``frigate.ini``, as well as the user-defined function +file ``frigate.py``. You can inspect them below. + +.. admonition:: Inspect configuration file ``frigate.ini`` + :class: tip dropdown + + .. literalinclude:: frigate.ini + :language: ini + +.. admonition:: Inspect user-defined function file ``frigate.py`` + :class: tip dropdown + + .. literalinclude:: frigate.py + :language: python + + +***** +Tests +***** The `test_frigate.py`_ file covers different code paths by running a few Frigate event message samples through the machinery, and inspecting their outcomes. You can invoke @@ -143,17 +166,21 @@ Example snapshot image .. _Blake Blackshear: /~https://github.com/blakeblackshear .. _camera pictures in JPEG format: https://docs.frigate.video/integrations/mqtt/#frigatecamera_nameobject_namesnapshot .. _Changthangi: https://en.wikipedia.org/wiki/Changthangi +.. _Eclipse Mosquitto: https://mosquitto.org/ .. _events in JSON format: https://docs.frigate.video/integrations/mqtt/#frigateevents .. _Frigate: https://frigate.video/ .. _Frigate on GitHub: /~https://github.com/blakeblackshear/frigate .. _frigate.ini: /~https://github.com/jpmens/mqttwarn/blob/main/examples/frigate/frigate.ini .. _frigate.py: /~https://github.com/jpmens/mqttwarn/blob/main/examples/frigate/frigate.py .. _Jaromír Kalina: https://unsplash.com/@jkalinaofficial -.. _Non-UTF-8 encoding causes error: /~https://github.com/jpmens/mqttwarn/issues/634 +.. _Mosquitto on GitHub: /~https://github.com/eclipse/mosquitto +.. _mqttwarn: https://mqttwarn.readthedocs.io/ +.. _mqttwarn on GitHub: /~https://github.com/jpmens/mqttwarn .. _ntfy: https://ntfy.sh/ .. _ntfy on GitHub: /~https://github.com/binwiederhier/ntfy +.. _ntfy service plugin: https://mqttwarn.readthedocs.io/en/latest/notifier-catalog.html#ntfy .. _Philipp C. Heckel: /~https://github.com/binwiederhier -.. _Receiving and processing MQTT messages from Frigate NVR: /~https://github.com/jpmens/mqttwarn/issues/632 +.. _pub-sub: https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern .. _Sev: /~https://github.com/sevmonster .. _test_frigate.py: /~https://github.com/jpmens/mqttwarn/blob/main/examples/frigate/test_frigate.py .. _Unsplash License: https://unsplash.com/license diff --git a/examples/frigate/docker-compose.yml b/examples/frigate/docker-compose.yml new file mode 100644 index 00000000..a9aa83b0 --- /dev/null +++ b/examples/frigate/docker-compose.yml @@ -0,0 +1,61 @@ +version: "3.8" + +services: + + # --------- + # Mosquitto + # --------- + # https://hub.docker.com/_/eclipse-mosquitto + mosquitto: + image: eclipse-mosquitto:${MOSQUITTO_VERSION} + container_name: mosquitto + command: ["mosquitto", "-c", "/mosquitto-no-auth.conf"] + ports: + - "${PORT_MOSQUITTO}:${PORT_MOSQUITTO}" + + # Define health check for Mosquitto. + healthcheck: + test: [ "CMD", "mosquitto_sub", "-v", "-t", "foobar", "-E" ] + start_period: 1s + interval: 3s + timeout: 10s + retries: 60 + + # ---- + # ntfy + # ---- + # https://docs.ntfy.sh/install/#docker + # https://hub.docker.com/r/binwiederhier/ntfy + ntfy: + image: binwiederhier/ntfy:${NTFY_VERSION} + container_name: ntfy + command: > + serve + --base-url="http://localhost:5555" + --attachment-cache-dir="/tmp/ntfy-attachments" + --attachment-expiry-duration="168h" + environment: + # optional: set desired timezone + - TZ=UTC + ports: + - "${PORT_NTFY}:80" + healthcheck: + test: ["CMD-SHELL", "wget -q --tries=1 http://localhost:5555/v1/health -O - | grep -Eo '\"healthy\"\\s*:\\s*true' || exit 1"] + interval: 60s + timeout: 10s + retries: 3 + start_period: 40s + + # ------- + # Bundler + # ------- + # Wait for all defined services to be fully available by probing their health + # status, even when using `docker compose up --detach`. + # https://marcopeg.com/2019/docker-compose-healthcheck/ + start-dependencies: + image: dadarek/wait-for-dependencies + depends_on: + mosquitto: + condition: service_healthy + ntfy: + condition: service_healthy diff --git a/examples/frigate/frigate.ini b/examples/frigate/frigate.ini index 1ec1e75c..2ffac63d 100644 --- a/examples/frigate/frigate.ini +++ b/examples/frigate/frigate.ini @@ -1,3 +1,6 @@ +# Frigate » Forward events and snapshots to Ntfy, using mqttwarn. +# https://mqttwarn.readthedocs.io/en/latest/examples/frigate/README.html + [defaults] functions = frigate.py launch = ntfy, store-image @@ -14,10 +17,11 @@ status_publish = True # Docs: https://docs.frigate.video/integrations/mqtt/#frigateevents [config:ntfy] -targets = { +targets = { 'test': { 'url': 'http://username:password@localhost:5555/frigate-testdrive', 'attachment': '/tmp/mqttwarn-frigate-{camera}-{label}.png', + 'click': 'https://httpbin.org/anything?camera={event.camera}&label={event.label}&zone={event.entered_zones[0]}', } } @@ -25,9 +29,8 @@ targets = { filter = frigate_events_filter() alldata = frigate_events() targets = ntfy:test -title = {title} -format = {format} -click = {click} +title = {event.label} entered {event.entered_zones_str} at {event.time} +format = {event.label} was in {event.current_zones_str} before # Limit the alert based on camera/zone. frigate_skip_rules = { diff --git a/examples/frigate/frigate.py b/examples/frigate/frigate.py index 04fcb226..ea0763dd 100644 --- a/examples/frigate/frigate.py +++ b/examples/frigate/frigate.py @@ -1,106 +1,99 @@ # -*- coding: utf-8 -*- +""" +Frigate » Forward events and snapshots to Ntfy, using mqttwarn. + +https://mqttwarn.readthedocs.io/en/latest/examples/frigate/README.html +""" import dataclasses +import json import re +import typing as t from collections import OrderedDict from datetime import datetime, timezone -import typing as t from mqttwarn.context import RuntimeContext from mqttwarn.model import Service -try: - import json -except ImportError: - import simplejson as json - @dataclasses.dataclass class FrigateEvent: """ Manage inbound event data received from Frigate. """ + time: datetime camera: str label: str current_zones: t.List[str] entered_zones: t.List[str] - def f(self, value): - return [y.replace('_', ' ') for y in value] + @staticmethod + def format_list(value: t.List[str]) -> t.List[str]: + """ + Format a list for human consumption. + """ + return [y.replace("_", " ") for y in value] @property - def current_zones_str(self): - if self.current_zones: - return ', '.join(self.f(self.current_zones)) - else: - return '' + def current_zones_str(self) -> str: + """ + Serialize list of `current_zones` to string. + """ + return ", ".join(self.format_list(self.current_zones or [])) @property - def entered_zones_str(self): - if self.entered_zones: - return ', '.join(self.f(self.entered_zones)) - else: - return '' + def entered_zones_str(self) -> str: + """ + Serialize list of `entered_zones` to string. + """ + return ", ".join(self.format_list(self.entered_zones or [])) def to_dict(self) -> t.Dict[str, str]: + """ + Return Python dictionary from attributes. + """ return dataclasses.asdict(self) + @classmethod + def from_json(cls, payload: str) -> "FrigateEvent": + """ + Decode inbound Frigate event, in JSON format. + """ + # Decode JSON message. + after = json.loads(payload)["after"] -@dataclasses.dataclass -class NtfyParameters: - """ - Manage outbound parameter data for ntfy. - """ - title: str - format: str - click: str - attach: t.Optional[str] = None + # Decode inbound Frigate event. + return cls( + time=datetime.fromtimestamp(after["frame_time"], tz=timezone.utc), + camera=after["camera"], + label=after["sub_label"] or after["label"], + current_zones=after["current_zones"], + entered_zones=after["entered_zones"], + ) - def to_dict(self) -> t.Dict[str, str]: - data = dataclasses.asdict(self) - data = {k: v for (k, v) in data.items() if v is not None} - return data +ContainerType = t.Dict[str, t.Union[str, FrigateEvent]] -def frigate_events(topic, data, srv: Service): + +def frigate_events(topic: str, data: t.Dict[str, str], srv: Service) -> ContainerType: """ mqttwarn transformation function which computes options to be submitted to ntfy. """ - # Acceptable hack to get attachment filename template from service configuration. - context: RuntimeContext = srv.mwcore["context"] - service_config = context.get_service_config("ntfy") - filename_template = service_config.get("filename_template") - - # Decode JSON message. - after = json.loads(data['payload'])['after'] - - # Collect details from inbound Frigate event. - event = FrigateEvent( - time=datetime.fromtimestamp(after['frame_time'], tz=timezone.utc), - camera=after['camera'], - label=after['sub_label'] or after['label'], - current_zones=after['current_zones'], - entered_zones=after['entered_zones'], - ) - - # Interpolate event data into attachment filename template. - # attach_filename = filename_template.format(**event.to_dict()) - - # Compute parameters for outbound ntfy URL. - ntfy_parameters = NtfyParameters( - title=f"{event.label} entered {event.entered_zones_str} at {event.time}", - format=f"{event.label} was in {event.current_zones_str} before", - click=f"https://frigate.local/events?camera={event.camera}&label={event.label}&zone={event.entered_zones[0]}", - #attach=attach_filename, - ) - params = OrderedDict() + # Decode inbound Frigate event. + event = FrigateEvent.from_json(data["payload"]) + + # Collect outbound ntfy option fields. + params: ContainerType = OrderedDict() params.update(event.to_dict()) - params.update(ntfy_parameters.to_dict()) + + # Also add the event object as a whole, to let downstream templates leverage it. + params["event"] = event + return params -def frigate_events_filter(topic, message, section, srv: Service): +def frigate_events_filter(topic: str, payload: str, section: str, srv: Service) -> bool: """ mqttwarn filter function to only use `new` and important `update` Frigate events. @@ -110,14 +103,14 @@ def frigate_events_filter(topic, message, section, srv: Service): :return: True if message should be filtered, i.e. notification should be skipped. """ try: - message = json.loads(message) + message = json.loads(payload) except json.JSONDecodeError as e: srv.logging.warning(f"Can't parse Frigate event message: {e}") return True # ignore ending messages - message_type = message.get('type', None) - if message_type == 'end': + message_type = message.get("type") + if message_type == "end": srv.logging.warning(f"Frigate event skipped, ignoring Message type '{message_type}'") return True @@ -126,9 +119,9 @@ def frigate_events_filter(topic, message, section, srv: Service): srv.logging.warning("Frigate event skipped, 'after' missing from payload") return True - after = message.get('after') + after = message.get("after") - nonempty_fields = ['false_positive', 'camera', 'label', 'current_zones', 'entered_zones', 'frame_time'] + nonempty_fields = ["false_positive", "camera", "label", "current_zones", "entered_zones", "frame_time"] for field in nonempty_fields: # Validate field exists. @@ -156,14 +149,14 @@ def frigate_events_filter(topic, message, section, srv: Service): return True # Ignore unimportant `update` events. - before = message.get('before') - if message_type == 'update' and isinstance(before, dict): - if before.get('stationary') is True and after.get('stationary') is True: + before = message.get("before") + if message_type == "update" and isinstance(before, dict): + if before.get("stationary") is True and after.get("stationary") is True: srv.logging.warning("Frigate event skipped, object is stationary") return True - elif (after['current_zones'] == after['entered_zones'] or - (before['current_zones'] == after['current_zones'] and - before['entered_zones'] == after['entered_zones'])): + elif after["current_zones"] == after["entered_zones"] or ( + before["current_zones"] == after["current_zones"] and before["entered_zones"] == after["entered_zones"] + ): srv.logging.warning("Frigate event skipped, object stayed within same zone") return True @@ -172,8 +165,8 @@ def frigate_events_filter(topic, message, section, srv: Service): frigate_skip_rules = context.config.getdict(section, "frigate_skip_rules") for rule in frigate_skip_rules.values(): do_skip = True - for fieldname, skip_values in rule.items(): - actual_value = after[fieldname] + for field_name, skip_values in rule.items(): + actual_value = after[field_name] if isinstance(actual_value, list): do_skip = do_skip and all(value in skip_values for value in actual_value) else: @@ -185,7 +178,7 @@ def frigate_events_filter(topic, message, section, srv: Service): return False -def frigate_snapshot_decode_topic(topic, data, srv: Service): +def frigate_snapshot_decode_topic(topic: str, data: t.Dict[str, str], srv: Service) -> t.Optional[t.Dict[str, str]]: """ Decode Frigate MQTT topic for image snapshots. @@ -194,13 +187,15 @@ def frigate_snapshot_decode_topic(topic, data, srv: Service): See also: - https://docs.frigate.video/integrations/mqtt/#frigatecamera_nameobject_namesnapshot """ - if type(topic) == str: + topology = {} + if isinstance(topic, str): try: - pattern = r'^frigate/(?P.+?)/(?P.+?)/snapshot$' + # TODO: Compile pattern only once, for efficiency reasons. + pattern = r"^frigate/(?P.+?)/(?P.+?)/snapshot$" p = re.compile(pattern) m = p.match(topic) - topology = m.groupdict() + if m: + topology = m.groupdict() except: - topology = {} - return topology - return None + pass + return topology diff --git a/examples/frigate/test_frigate.py b/examples/frigate/test_frigate.py index 8d66ff87..eca22f5b 100644 --- a/examples/frigate/test_frigate.py +++ b/examples/frigate/test_frigate.py @@ -15,7 +15,6 @@ from mqttwarn.core import bootstrap, connect from tests.util import mqtt_process - if os.getenv("GITHUB_ACTIONS") == "true" and sys.platform != "linux": raise pytest.skip(msg="On GHA, Mosquitto via Docker is only available on Linux", allow_module_level=True) @@ -72,16 +71,16 @@ def test_frigate_event_new(mosquitto, ntfy_service, caplog, capmqtt): assert "Invoking service plugin for `ntfy'" in caplog.messages assert ( "Headers: {" + "'Click': 'https://httpbin.org/anything?camera=cam-testdrive&label=goat&zone=lawn', " "'Title': 'goat entered lawn at 2023-04-06 14:31:46.638857+00:00', " - "'Click': 'https://frigate.local/events?camera=cam-testdrive&label=goat&zone=lawn', " "'Message': 'goat was in barn before', " - "'Filename': 'mqttwarn-frigate-cam-testdrive-goat.png'}" - in caplog.messages + "'Filename': 'mqttwarn-frigate-cam-testdrive-goat.png'}" in caplog.messages ) assert ( "Sending notification to ntfy. target=test, options={" "'url': 'http://username:password@localhost:5555/frigate-testdrive', " - "'attachment': '/tmp/mqttwarn-frigate-{camera}-{label}.png'}" + "'attachment': '/tmp/mqttwarn-frigate-{camera}-{label}.png', " + "'click': 'https://httpbin.org/anything?camera={event.camera}&label={event.label}&zone={event.entered_zones[0]}'}" in caplog.messages ) @@ -134,11 +133,10 @@ def test_frigate_event_with_notification(mosquitto, ntfy_service, caplog, capmqt assert "Invoking service plugin for `ntfy'" in caplog.messages assert ( "Headers: {" + "'Click': 'https://httpbin.org/anything?camera=cam-testdrive&label=goat&zone=lawn', " "'Title': 'goat entered lawn at 2023-04-06 14:31:46.638857+00:00', " - "'Click': 'https://frigate.local/events?camera=cam-testdrive&label=goat&zone=lawn', " "'Message': 'goat was in barn before', " - "'Filename': 'mqttwarn-frigate-cam-testdrive-goat.png'}" - in caplog.messages + "'Filename': 'mqttwarn-frigate-cam-testdrive-goat.png'}" in caplog.messages ) # assert "Sent ntfy notification to 'http://localhost:5555'." in caplog.messages @@ -196,4 +194,6 @@ def get_goat_image() -> bytes: """ Get an image of a Changthangi goat. """ - return requests.get("https://user-images.githubusercontent.com/453543/231550862-5a64ac7c-bdfa-4509-86b8-b1a770899647.png").content + return requests.get( + "https://user-images.githubusercontent.com/453543/231550862-5a64ac7c-bdfa-4509-86b8-b1a770899647.png" + ).content diff --git a/pyproject.toml b/pyproject.toml index 9b213507..eb6bc79b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ files = [ "mqttwarn/core.py", "mqttwarn/services/ntfy.py", "tests/services/test_ntfy.py", + "examples/frigate/**/*.py", ]