Skip to content

Commit

Permalink
[AIRFLOW-4069] Add Opsgenie Alert Hook and Operator (#4903)
Browse files Browse the repository at this point in the history
  • Loading branch information
nritholtz authored and ashb committed Apr 5, 2019
1 parent 9988fdb commit d949340
Show file tree
Hide file tree
Showing 5 changed files with 484 additions and 0 deletions.
88 changes: 88 additions & 0 deletions airflow/contrib/hooks/opsgenie_alert_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

import json

import requests

from airflow.hooks.http_hook import HttpHook
from airflow import AirflowException


class OpsgenieAlertHook(HttpHook):
"""
This hook allows you to post alerts to Opsgenie.
Accepts a connection that has an Opsgenie API key as the connection's password.
This hook sets the domain to conn_id.host, and if not set will default
to ``https://api.opsgenie.com``.
Each Opsgenie API key can be pre-configured to a team integration.
You can override these defaults in this hook.
:param opsgenie_conn_id: The name of the Opsgenie connection to use
:type opsgenie_conn_id: str
"""
def __init__(self,
opsgenie_conn_id='opsgenie_default',
*args,
**kwargs
):
super(OpsgenieAlertHook, self).__init__(http_conn_id=opsgenie_conn_id, *args, **kwargs)

def _get_api_key(self):
"""
Get Opsgenie api_key for creating alert
"""
conn = self.get_connection(self.http_conn_id)
api_key = conn.password
if not api_key:
raise AirflowException('Opsgenie API Key is required for this hook, '
'please check your conn_id configuration.')
return api_key

def get_conn(self, headers=None):
"""
Overwrite HttpHook get_conn because this hook just needs base_url
and headers, and does not need generic params
:param headers: additional headers to be passed through as a dictionary
:type headers: dict
"""
conn = self.get_connection(self.http_conn_id)
self.base_url = conn.host if conn.host else 'https://api.opsgenie.com'
session = requests.Session()
if headers:
session.headers.update(headers)
return session

def execute(self, payload={}):
"""
Execute the Opsgenie Alert call
:param payload: Opsgenie API Create Alert payload values
See https://docs.opsgenie.com/docs/alert-api#section-create-alert
:type payload: dict
"""
api_key = self._get_api_key()
return self.run(endpoint='v2/alerts',
data=json.dumps(payload),
headers={'Content-Type': 'application/json',
'Authorization': 'GenieKey %s' % api_key})
131 changes: 131 additions & 0 deletions airflow/contrib/operators/opsgenie_alert_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from airflow.contrib.hooks.opsgenie_alert_hook import OpsgenieAlertHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class OpsgenieAlertOperator(BaseOperator):
"""
This operator allows you to post alerts to Opsgenie.
Accepts a connection that has an Opsgenie API key as the connection's password.
This operator sets the domain to conn_id.host, and if not set will default
to ``https://api.opsgenie.com``.
Each Opsgenie API key can be pre-configured to a team integration.
You can override these defaults in this operator.
:param opsgenie_conn_id: The name of the Opsgenie connection to use
:type opsgenie_conn_id: str
:param message: The Message of the Opsgenie alert (templated)
:type message: str
:param alias: Client-defined identifier of the alert (templated)
:type alias: str
:param description: Description field of the alert (templated)
:type description: str
:param responders: Teams, users, escalations and schedules that
the alert will be routed to send notifications.
:type responders: list[dict]
:param visibleTo: Teams and users that the alert will become visible
to without sending any notification.
:type visibleTo: list[dict]
:param actions: Custom actions that will be available for the alert.
:type actions: list[str]
:param tags: Tags of the alert.
:type tags: list[str]
:param details: Map of key-value pairs to use as custom properties of the alert.
:type details: dict
:param entity: Entity field of the alert that is
generally used to specify which domain alert is related to. (templated)
:type entity: str
:param source: Source field of the alert. Default value is
IP address of the incoming request.
:type source: str
:param priority: Priority level of the alert. Default value is P3. (templated)
:type priority: str
:param user: Display name of the request owner.
:type user: str
:param note: Additional note that will be added while creating the alert. (templated)
:type note: str
"""
template_fields = ('message', 'alias', 'description', 'entity', 'priority', 'note')

@apply_defaults
def __init__(self,
message,
opsgenie_conn_id='opsgenie_default',
alias=None,
description=None,
responders=None,
visibleTo=None,
actions=None,
tags=None,
details=None,
entity=None,
source=None,
priority=None,
user=None,
note=None,
*args,
**kwargs
):
super(OpsgenieAlertOperator, self).__init__(*args, **kwargs)

self.message = message
self.opsgenie_conn_id = opsgenie_conn_id
self.alias = alias
self.description = description
self.responders = responders
self.visibleTo = visibleTo
self.actions = actions
self.tags = tags
self.details = details
self.entity = entity
self.source = source
self.priority = priority
self.user = user
self.note = note
self.hook = None

def _build_opsgenie_payload(self):
"""
Construct the Opsgenie JSON payload. All relevant parameters are combined here
to a valid Opsgenie JSON payload.
:return: Opsgenie payload (dict) to send
"""
payload = {}

for key in [
"message", "alias", "description", "responders",
"visibleTo", "actions", "tags", "details", "entity",
"source", "priority", "user", "note"
]:
val = getattr(self, key)
if val:
payload[key] = val
return payload

def execute(self, context):
"""
Call the OpsgenieAlertHook to post message
"""
self.hook = OpsgenieAlertHook(self.opsgenie_conn_id)
self.hook.execute(self._build_opsgenie_payload())
4 changes: 4 additions & 0 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ def initdb(rbac=False):
Connection(
conn_id='dingding_default', conn_type='http',
host='', password=''))
merge_conn(
Connection(
conn_id='opsgenie_default', conn_type='http',
host='', password=''))

# Known event types
KET = models.KnownEventType
Expand Down
137 changes: 137 additions & 0 deletions tests/contrib/hooks/test_opsgenie_alert_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import unittest
import requests_mock
import json

from airflow import configuration
from airflow.models.connection import Connection
from airflow.utils import db
from airflow.contrib.hooks.opsgenie_alert_hook import OpsgenieAlertHook
from airflow import AirflowException


class TestOpsgenieAlertHook(unittest.TestCase):
conn_id = 'opsgenie_conn_id_test'
opsgenie_alert_endpoint = 'https://api.opsgenie.com/v2/alerts'
_payload = {
'message': 'An example alert message',
'alias': 'Life is too short for no alias',
'description': 'Every alert needs a description',
'responders': [
{'id': '4513b7ea-3b91-438f-b7e4-e3e54af9147c', 'type': 'team'},
{'name': 'NOC', 'type': 'team'},
{'id': 'bb4d9938-c3c2-455d-aaab-727aa701c0d8', 'type': 'user'},
{'username': 'trinity@opsgenie.com', 'type': 'user'},
{'id': 'aee8a0de-c80f-4515-a232-501c0bc9d715', 'type': 'escalation'},
{'name': 'Nightwatch Escalation', 'type': 'escalation'},
{'id': '80564037-1984-4f38-b98e-8a1f662df552', 'type': 'schedule'},
{'name': 'First Responders Schedule', 'type': 'schedule'}
],
'visibleTo': [
{'id': '4513b7ea-3b91-438f-b7e4-e3e54af9147c', 'type': 'team'},
{'name': 'rocket_team', 'type': 'team'},
{'id': 'bb4d9938-c3c2-455d-aaab-727aa701c0d8', 'type': 'user'},
{'username': 'trinity@opsgenie.com', 'type': 'user'}
],
'actions': ['Restart', 'AnExampleAction'],
'tags': ['OverwriteQuietHours', 'Critical'],
'details': {'key1': 'value1', 'key2': 'value2'},
'entity': 'An example entity',
'source': 'Airflow',
'priority': 'P1',
'user': 'Jesse',
'note': 'Write this down'
}
_mock_success_response_body = {
"result": "Request will be processed",
"took": 0.302,
"requestId": "43a29c5c-3dbf-4fa4-9c26-f4f71023e120"
}

def setUp(self):
configuration.load_test_config()
db.merge_conn(
Connection(
conn_id=self.conn_id,
host='https://api.opsgenie.com/',
password='eb243592-faa2-4ba2-a551q-1afdf565c889'
)
)

def test_get_api_key(self):
hook = OpsgenieAlertHook(opsgenie_conn_id=self.conn_id)
api_key = hook._get_api_key()
self.assertEqual('eb243592-faa2-4ba2-a551q-1afdf565c889', api_key)

def test_get_conn_defaults_host(self):
hook = OpsgenieAlertHook()
hook.get_conn()
self.assertEqual('https://api.opsgenie.com', hook.base_url)

@requests_mock.mock()
def test_call_with_success(self, m):
hook = OpsgenieAlertHook(opsgenie_conn_id=self.conn_id)
m.post(
self.opsgenie_alert_endpoint,
status_code=202,
json=self._mock_success_response_body
)
resp = hook.execute(payload=self._payload)
self.assertEqual(resp.status_code, 202)
self.assertEqual(resp.json(), self._mock_success_response_body)

@requests_mock.mock()
def test_api_key_set(self, m):
hook = OpsgenieAlertHook(opsgenie_conn_id=self.conn_id)
m.post(
self.opsgenie_alert_endpoint,
status_code=202,
json=self._mock_success_response_body
)
resp = hook.execute(payload=self._payload)
self.assertEqual(resp.request.headers.get('Authorization'),
'GenieKey eb243592-faa2-4ba2-a551q-1afdf565c889')

@requests_mock.mock()
def test_api_key_not_set(self, m):
hook = OpsgenieAlertHook()
m.post(
self.opsgenie_alert_endpoint,
status_code=202,
json=self._mock_success_response_body
)
with self.assertRaises(AirflowException):
hook.execute(payload=self._payload)

@requests_mock.mock()
def test_payload_set(self, m):
hook = OpsgenieAlertHook(opsgenie_conn_id=self.conn_id)
m.post(
self.opsgenie_alert_endpoint,
status_code=202,
json=self._mock_success_response_body
)
resp = hook.execute(payload=self._payload)
self.assertEqual(json.loads(resp.request.body), self._payload)


if __name__ == '__main__':
unittest.main()
Loading

0 comments on commit d949340

Please sign in to comment.