Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-4067] Telegram hook/operator to post messages to telegram channels #4891

Closed
wants to merge 11 commits into from
101 changes: 101 additions & 0 deletions airflow/contrib/hooks/telegram_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import telegram
import tenacity

from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException


class TelegramHook(BaseHook):
"""
Interact with Telegram, using python-telegram-bot library.
"""

def __init__(self, telegram_conn_id=None, chat_id=None):
"""
Takes both telegram bot API token directly and connection that has telegram bot API token.
If both supplied, telegram API token will be used.
dchaplinsky marked this conversation as resolved.
Show resolved Hide resolved

:param telegram_conn_id: connection that has telegram API token in the password field
:type telegram_conn_id: str
:param chat_id: Telegram public or private channel id (optional).
Check https://stackoverflow.com/a/33862907 to see how you can obtain chat_id for private
dchaplinsky marked this conversation as resolved.
Show resolved Hide resolved
channel
:type chat_id: str
"""
self.token = self.__get_token(telegram_conn_id)
self.chat_id = self.__get_chat_id(chat_id, telegram_conn_id)
self.connection = self.get_conn()

def get_conn(self):
return telegram.Bot(token=self.token)

def __get_token(self, telegram_conn_id):
if telegram_conn_id is not None:
conn = self.get_connection(telegram_conn_id)

if not conn.password:
raise AirflowException("Missing token(password) in Telegram connection")
return conn.password
else:
raise AirflowException(
"Cannot get token: " "No valid Telegram connection supplied."
)

def __get_chat_id(self, chat_id, telegram_conn_id):
if chat_id is not None:
return chat_id
elif telegram_conn_id is not None:
conn = self.get_connection(telegram_conn_id)

if not getattr(conn, "host", None):
raise AirflowException("Missing chat_id (host) in Telegram connection")
return conn.host
else:
raise AirflowException(
"Cannot get chat_id: " "No valid chat_id nor telegram_conn_id supplied."
)

@tenacity.retry(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

retry=tenacity.retry_if_exception_type(telegram.error.TelegramError),
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_fixed(1),
)
def call(self, method, api_params):
"""
Send a message to a telegram channel

:param method: not used
:type method: str
:param api_params: params for telegram_instance.send_message. You can use it also to override chat_id
:type api_params: dict
"""

params = {
"chat_id": self.chat_id,
"parse_mode": telegram.ParseMode.HTML,
"disable_web_page_preview": True,
}
params.update(api_params)
self.log.debug(self.connection.send_message(**params))


__all__ = ["TelegramHook"]
50 changes: 50 additions & 0 deletions airflow/contrib/operators/telegram_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from airflow.contrib.hooks.telegram_hook import TelegramHook


class TelegramAPIOperator(BaseOperator):
"""
Telegram Operator

:param text: Templatable message
:type text: str
:param chat_id: Telegram channel ID
:type chat_id: str
:param telegram_conn_id: Telegram connection ID which its password is Telegram API token
:type telegram_conn_id: str
"""

template_fields = ("text", "chat_id")
ui_color = "#FFBA40"

@apply_defaults
def __init__(
self,
text="No message has been set.\n"
"Here is a cat video instead\n"
"https://www.youtube.com/watch?v=J---aiyznGQ",
chat_id=None,
telegram_conn_id="telegram_default",
*args,
**kwargs
):
self.text = text
self.chat_id = chat_id

if telegram_conn_id is None:
raise AirflowException("No valid Telegram connection id supplied.")

self.telegram_conn_id = telegram_conn_id

super(TelegramAPIOperator, self).__init__(*args, **kwargs)

def execute(self, **kwargs):
"""
TelegramAPIOperator calls will not fail even if the call is not unsuccessful.
It should not prevent a DAG from completing in success
"""

telegram_client = TelegramHook(telegram_conn_id=self.telegram_conn_id)
telegram_client.call("POST", {"text": self.text})
2 changes: 2 additions & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ Operators
.. autoclass:: airflow.contrib.operators.vertica_to_mysql.VerticaToMySqlTransfer
.. autoclass:: airflow.contrib.operators.wasb_delete_blob_operator.WasbDeleteBlobOperator
.. autoclass:: airflow.contrib.operators.winrm_operator.WinRMOperator
.. autoclass:: airflow.contrib.operators.telegram_operator.TelegramAPIOperator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the class in alphabetical order. First, according to class names, and then according to the module path.


Sensors
^^^^^^^
Expand Down Expand Up @@ -551,6 +552,7 @@ Community contributed hooks
.. autoclass:: airflow.contrib.hooks.vertica_hook.VerticaHook
.. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook
.. autoclass:: airflow.contrib.hooks.winrm_hook.WinRMHook
.. autoclass:: airflow.contrib.hooks.telegram_hook.TelegramHook
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the class in alphabetical order. First, according to class names, and then according to the module path.


Executors
---------
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
'vertica_python',
'winrm',
'zdesk',
'telegram',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, keep alphabetical order.

]

# Hack to allow changing for piece of the code to behave differently while
Expand Down