From 322c1df15b50207ad4d7f90d18ef1710a0777b0a Mon Sep 17 00:00:00 2001 From: Nathaniel Ritholtz Date: Fri, 5 Apr 2019 17:40:54 -0400 Subject: [PATCH] [AIRFLOW-4069] Add Opsgenie Alert Hook and Operator (#4903) --- airflow/contrib/hooks/opsgenie_alert_hook.py | 88 +++++++++++ .../operators/opsgenie_alert_operator.py | 131 +++++++++++++++++ airflow/utils/db.py | 4 + .../contrib/hooks/test_opsgenie_alert_hook.py | 137 ++++++++++++++++++ .../operators/test_opsgenie_alert_operator.py | 124 ++++++++++++++++ 5 files changed, 484 insertions(+) create mode 100644 airflow/contrib/hooks/opsgenie_alert_hook.py create mode 100644 airflow/contrib/operators/opsgenie_alert_operator.py create mode 100644 tests/contrib/hooks/test_opsgenie_alert_hook.py create mode 100644 tests/contrib/operators/test_opsgenie_alert_operator.py diff --git a/airflow/contrib/hooks/opsgenie_alert_hook.py b/airflow/contrib/hooks/opsgenie_alert_hook.py new file mode 100644 index 0000000000000..d576c08c23982 --- /dev/null +++ b/airflow/contrib/hooks/opsgenie_alert_hook.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import json + +import requests + +from airflow.hooks.http_hook import HttpHook +from airflow import AirflowException + + +class OpsgenieAlertHook(HttpHook): + """ + This hook allows you to post alerts to Opsgenie. + Accepts a connection that has an Opsgenie API key as the connection's password. + This hook sets the domain to conn_id.host, and if not set will default + to ``https://api.opsgenie.com``. + + Each Opsgenie API key can be pre-configured to a team integration. + You can override these defaults in this hook. + + :param opsgenie_conn_id: The name of the Opsgenie connection to use + :type opsgenie_conn_id: str + + """ + def __init__(self, + opsgenie_conn_id='opsgenie_default', + *args, + **kwargs + ): + super(OpsgenieAlertHook, self).__init__(http_conn_id=opsgenie_conn_id, *args, **kwargs) + + def _get_api_key(self): + """ + Get Opsgenie api_key for creating alert + """ + conn = self.get_connection(self.http_conn_id) + api_key = conn.password + if not api_key: + raise AirflowException('Opsgenie API Key is required for this hook, ' + 'please check your conn_id configuration.') + return api_key + + def get_conn(self, headers=None): + """ + Overwrite HttpHook get_conn because this hook just needs base_url + and headers, and does not need generic params + + :param headers: additional headers to be passed through as a dictionary + :type headers: dict + """ + conn = self.get_connection(self.http_conn_id) + self.base_url = conn.host if conn.host else 'https://api.opsgenie.com' + session = requests.Session() + if headers: + session.headers.update(headers) + return session + + def execute(self, payload={}): + """ + Execute the Opsgenie Alert call + + :param payload: Opsgenie API Create Alert payload values + See https://docs.opsgenie.com/docs/alert-api#section-create-alert + :type payload: dict + """ + api_key = self._get_api_key() + return self.run(endpoint='v2/alerts', + data=json.dumps(payload), + headers={'Content-Type': 'application/json', + 'Authorization': 'GenieKey %s' % api_key}) diff --git a/airflow/contrib/operators/opsgenie_alert_operator.py b/airflow/contrib/operators/opsgenie_alert_operator.py new file mode 100644 index 0000000000000..c46d234932d0d --- /dev/null +++ b/airflow/contrib/operators/opsgenie_alert_operator.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from airflow.contrib.hooks.opsgenie_alert_hook import OpsgenieAlertHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class OpsgenieAlertOperator(BaseOperator): + """ + This operator allows you to post alerts to Opsgenie. + Accepts a connection that has an Opsgenie API key as the connection's password. + This operator sets the domain to conn_id.host, and if not set will default + to ``https://api.opsgenie.com``. + + Each Opsgenie API key can be pre-configured to a team integration. + You can override these defaults in this operator. + + :param opsgenie_conn_id: The name of the Opsgenie connection to use + :type opsgenie_conn_id: str + :param message: The Message of the Opsgenie alert (templated) + :type message: str + :param alias: Client-defined identifier of the alert (templated) + :type alias: str + :param description: Description field of the alert (templated) + :type description: str + :param responders: Teams, users, escalations and schedules that + the alert will be routed to send notifications. + :type responders: list[dict] + :param visibleTo: Teams and users that the alert will become visible + to without sending any notification. + :type visibleTo: list[dict] + :param actions: Custom actions that will be available for the alert. + :type actions: list[str] + :param tags: Tags of the alert. + :type tags: list[str] + :param details: Map of key-value pairs to use as custom properties of the alert. + :type details: dict + :param entity: Entity field of the alert that is + generally used to specify which domain alert is related to. (templated) + :type entity: str + :param source: Source field of the alert. Default value is + IP address of the incoming request. + :type source: str + :param priority: Priority level of the alert. Default value is P3. (templated) + :type priority: str + :param user: Display name of the request owner. + :type user: str + :param note: Additional note that will be added while creating the alert. (templated) + :type note: str + """ + template_fields = ('message', 'alias', 'description', 'entity', 'priority', 'note') + + @apply_defaults + def __init__(self, + message, + opsgenie_conn_id='opsgenie_default', + alias=None, + description=None, + responders=None, + visibleTo=None, + actions=None, + tags=None, + details=None, + entity=None, + source=None, + priority=None, + user=None, + note=None, + *args, + **kwargs + ): + super(OpsgenieAlertOperator, self).__init__(*args, **kwargs) + + self.message = message + self.opsgenie_conn_id = opsgenie_conn_id + self.alias = alias + self.description = description + self.responders = responders + self.visibleTo = visibleTo + self.actions = actions + self.tags = tags + self.details = details + self.entity = entity + self.source = source + self.priority = priority + self.user = user + self.note = note + self.hook = None + + def _build_opsgenie_payload(self): + """ + Construct the Opsgenie JSON payload. All relevant parameters are combined here + to a valid Opsgenie JSON payload. + + :return: Opsgenie payload (dict) to send + """ + payload = {} + + for key in [ + "message", "alias", "description", "responders", + "visibleTo", "actions", "tags", "details", "entity", + "source", "priority", "user", "note" + ]: + val = getattr(self, key) + if val: + payload[key] = val + return payload + + def execute(self, context): + """ + Call the OpsgenieAlertHook to post message + """ + self.hook = OpsgenieAlertHook(self.opsgenie_conn_id) + self.hook.execute(self._build_opsgenie_payload()) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index fef863b285b21..66143fdd6387e 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -286,6 +286,10 @@ def initdb(): Connection( conn_id='dingding_default', conn_type='http', host='', password='')) + merge_conn( + Connection( + conn_id='opsgenie_default', conn_type='http', + host='', password='')) dagbag = models.DagBag() # Save individual DAGs in the ORM diff --git a/tests/contrib/hooks/test_opsgenie_alert_hook.py b/tests/contrib/hooks/test_opsgenie_alert_hook.py new file mode 100644 index 0000000000000..be6bdf6fb9d8c --- /dev/null +++ b/tests/contrib/hooks/test_opsgenie_alert_hook.py @@ -0,0 +1,137 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import unittest +import requests_mock +import json + +from airflow import configuration +from airflow.models.connection import Connection +from airflow.utils import db +from airflow.contrib.hooks.opsgenie_alert_hook import OpsgenieAlertHook +from airflow import AirflowException + + +class TestOpsgenieAlertHook(unittest.TestCase): + conn_id = 'opsgenie_conn_id_test' + opsgenie_alert_endpoint = 'https://api.opsgenie.com/v2/alerts' + _payload = { + 'message': 'An example alert message', + 'alias': 'Life is too short for no alias', + 'description': 'Every alert needs a description', + 'responders': [ + {'id': '4513b7ea-3b91-438f-b7e4-e3e54af9147c', 'type': 'team'}, + {'name': 'NOC', 'type': 'team'}, + {'id': 'bb4d9938-c3c2-455d-aaab-727aa701c0d8', 'type': 'user'}, + {'username': 'trinity@opsgenie.com', 'type': 'user'}, + {'id': 'aee8a0de-c80f-4515-a232-501c0bc9d715', 'type': 'escalation'}, + {'name': 'Nightwatch Escalation', 'type': 'escalation'}, + {'id': '80564037-1984-4f38-b98e-8a1f662df552', 'type': 'schedule'}, + {'name': 'First Responders Schedule', 'type': 'schedule'} + ], + 'visibleTo': [ + {'id': '4513b7ea-3b91-438f-b7e4-e3e54af9147c', 'type': 'team'}, + {'name': 'rocket_team', 'type': 'team'}, + {'id': 'bb4d9938-c3c2-455d-aaab-727aa701c0d8', 'type': 'user'}, + {'username': 'trinity@opsgenie.com', 'type': 'user'} + ], + 'actions': ['Restart', 'AnExampleAction'], + 'tags': ['OverwriteQuietHours', 'Critical'], + 'details': {'key1': 'value1', 'key2': 'value2'}, + 'entity': 'An example entity', + 'source': 'Airflow', + 'priority': 'P1', + 'user': 'Jesse', + 'note': 'Write this down' + } + _mock_success_response_body = { + "result": "Request will be processed", + "took": 0.302, + "requestId": "43a29c5c-3dbf-4fa4-9c26-f4f71023e120" + } + + def setUp(self): + configuration.load_test_config() + db.merge_conn( + Connection( + conn_id=self.conn_id, + host='https://api.opsgenie.com/', + password='eb243592-faa2-4ba2-a551q-1afdf565c889' + ) + ) + + def test_get_api_key(self): + hook = OpsgenieAlertHook(opsgenie_conn_id=self.conn_id) + api_key = hook._get_api_key() + self.assertEqual('eb243592-faa2-4ba2-a551q-1afdf565c889', api_key) + + def test_get_conn_defaults_host(self): + hook = OpsgenieAlertHook() + hook.get_conn() + self.assertEqual('https://api.opsgenie.com', hook.base_url) + + @requests_mock.mock() + def test_call_with_success(self, m): + hook = OpsgenieAlertHook(opsgenie_conn_id=self.conn_id) + m.post( + self.opsgenie_alert_endpoint, + status_code=202, + json=self._mock_success_response_body + ) + resp = hook.execute(payload=self._payload) + self.assertEqual(resp.status_code, 202) + self.assertEqual(resp.json(), self._mock_success_response_body) + + @requests_mock.mock() + def test_api_key_set(self, m): + hook = OpsgenieAlertHook(opsgenie_conn_id=self.conn_id) + m.post( + self.opsgenie_alert_endpoint, + status_code=202, + json=self._mock_success_response_body + ) + resp = hook.execute(payload=self._payload) + self.assertEqual(resp.request.headers.get('Authorization'), + 'GenieKey eb243592-faa2-4ba2-a551q-1afdf565c889') + + @requests_mock.mock() + def test_api_key_not_set(self, m): + hook = OpsgenieAlertHook() + m.post( + self.opsgenie_alert_endpoint, + status_code=202, + json=self._mock_success_response_body + ) + with self.assertRaises(AirflowException): + hook.execute(payload=self._payload) + + @requests_mock.mock() + def test_payload_set(self, m): + hook = OpsgenieAlertHook(opsgenie_conn_id=self.conn_id) + m.post( + self.opsgenie_alert_endpoint, + status_code=202, + json=self._mock_success_response_body + ) + resp = hook.execute(payload=self._payload) + self.assertEqual(json.loads(resp.request.body), self._payload) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/contrib/operators/test_opsgenie_alert_operator.py b/tests/contrib/operators/test_opsgenie_alert_operator.py new file mode 100644 index 0000000000000..1b4467bc5a523 --- /dev/null +++ b/tests/contrib/operators/test_opsgenie_alert_operator.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest + +from airflow import DAG, configuration + +from airflow.contrib.operators.opsgenie_alert_operator import OpsgenieAlertOperator +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2017, 1, 1) + + +class TestOpsgenieAlertOperator(unittest.TestCase): + _config = { + 'message': 'An example alert message', + 'alias': 'Life is too short for no alias', + 'description': 'Every alert needs a description', + 'responders': [ + {'id': '4513b7ea-3b91-438f-b7e4-e3e54af9147c', 'type': 'team'}, + {'name': 'NOC', 'type': 'team'}, + {'id': 'bb4d9938-c3c2-455d-aaab-727aa701c0d8', 'type': 'user'}, + {'username': 'trinity@opsgenie.com', 'type': 'user'}, + {'id': 'aee8a0de-c80f-4515-a232-501c0bc9d715', 'type': 'escalation'}, + {'name': 'Nightwatch Escalation', 'type': 'escalation'}, + {'id': '80564037-1984-4f38-b98e-8a1f662df552', 'type': 'schedule'}, + {'name': 'First Responders Schedule', 'type': 'schedule'} + ], + 'visibleTo': [ + {'id': '4513b7ea-3b91-438f-b7e4-e3e54af9147c', 'type': 'team'}, + {'name': 'rocket_team', 'type': 'team'}, + {'id': 'bb4d9938-c3c2-455d-aaab-727aa701c0d8', 'type': 'user'}, + {'username': 'trinity@opsgenie.com', 'type': 'user'} + ], + 'actions': ['Restart', 'AnExampleAction'], + 'tags': ['OverwriteQuietHours', 'Critical'], + 'details': {'key1': 'value1', 'key2': 'value2'}, + 'entity': 'An example entity', + 'source': 'Airflow', + 'priority': 'P1', + 'user': 'Jesse', + 'note': 'Write this down' + } + + expected_payload_dict = { + 'message': _config['message'], + 'alias': _config['alias'], + 'description': _config['description'], + 'responders': _config['responders'], + 'visibleTo': _config['visibleTo'], + 'actions': _config['actions'], + 'tags': _config['tags'], + 'details': _config['details'], + 'entity': _config['entity'], + 'source': _config['source'], + 'priority': _config['priority'], + 'user': _config['user'], + 'note': _config['note'] + } + + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + self.dag = DAG('test_dag_id', default_args=args) + + def test_build_opsgenie_payload(self): + # Given / When + operator = OpsgenieAlertOperator( + task_id='opsgenie_alert_job', + dag=self.dag, + **self._config + ) + + payload = operator._build_opsgenie_payload() + + # Then + self.assertEqual(self.expected_payload_dict, payload) + + def test_properties(self): + # Given / When + operator = OpsgenieAlertOperator( + task_id='opsgenie_alert_job', + dag=self.dag, + **self._config + ) + + self.assertEqual('opsgenie_default', operator.opsgenie_conn_id) + self.assertEqual(self._config['message'], operator.message) + self.assertEqual(self._config['alias'], operator.alias) + self.assertEqual(self._config['description'], operator.description) + self.assertEqual(self._config['responders'], operator.responders) + self.assertEqual(self._config['visibleTo'], operator.visibleTo) + self.assertEqual(self._config['actions'], operator.actions) + self.assertEqual(self._config['tags'], operator.tags) + self.assertEqual(self._config['details'], operator.details) + self.assertEqual(self._config['entity'], operator.entity) + self.assertEqual(self._config['source'], operator.source) + self.assertEqual(self._config['priority'], operator.priority) + self.assertEqual(self._config['user'], operator.user) + self.assertEqual(self._config['note'], operator.note) + + +if __name__ == '__main__': + unittest.main()