Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-4067] Telegram hook/operator to post messages to telegram channels #4891

Closed
wants to merge 11 commits into from
117 changes: 117 additions & 0 deletions airflow/contrib/hooks/telegram_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import telegram
import time
from functools import wraps
from airflow.hooks.base_hook import BaseHook
from airflow.exceptions import AirflowException


def retry(exceptions, tries=4, delay=3, backoff=2, logger=None):
"""
Retry calling the decorated function using an exponential backoff.
(с) Eliot aka saltycrane, https://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/

Args:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a reStructuredText docstring style.

exceptions: The exception to check. may be a tuple of
exceptions to check.
tries: Number of times to try (not retry) before giving up.
delay: Initial delay between retries in seconds.
backoff: Backoff multiplier (e.g. value of 2 will double the delay
each retry).
logger: Logger to use. If None, print.
"""

def deco_retry(f):
@wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay
while mtries > 1:
try:
return f(*args, **kwargs)
except exceptions as e:
msg = "{}, Retrying in {} seconds...".format(e, mdelay)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should avoid formatting string before passing to logger.
Reference: #4804

if logger:
logger.warning(msg)
else:
print(msg)
time.sleep(mdelay)
mtries -= 1
mdelay *= backoff
return f(*args, **kwargs)

return f_retry # true decorator

return deco_retry


class TelegramHook(BaseHook):
"""
Interact with Telegram, using python-telegram-bot library.
"""

def __init__(self, telegram_conn_id=None, chat_id=None):
"""
Takes both telegram bot API token directly and connection that has telegram bot API token.
If both supplied, telegram API token will be used.
dchaplinsky marked this conversation as resolved.
Show resolved Hide resolved

:param telegram_conn_id: connection that has telegram API token in the password field
:type telegram_conn_id: str
:param chat_id: Telegram public or private channel id (optional).
Check https://stackoverflow.com/a/33862907 to see how you can obtain chat_id for private
dchaplinsky marked this conversation as resolved.
Show resolved Hide resolved
channel
:type chat_id: str
"""
self.token = self.__get_token(telegram_conn_id)
self.chat_id = self.__get_chat_id(chat_id, telegram_conn_id)
self.connection = self.get_conn()

def get_conn(self):
return telegram.Bot(token=self.token)

def __get_token(self, telegram_conn_id):
if telegram_conn_id is not None:
conn = self.get_connection(telegram_conn_id)

if not conn.password:
raise AirflowException("Missing token(password) in Telegram connection")
return conn.password
else:
raise AirflowException(
"Cannot get token: " "No valid Telegram connection supplied."
)

def __get_chat_id(self, chat_id, telegram_conn_id):
if chat_id is not None:
return chat_id
elif telegram_conn_id is not None:
conn = self.get_connection(telegram_conn_id)

if not getattr(conn, "host", None):
raise AirflowException("Missing chat_id (host) in Telegram connection")
return conn.host
else:
raise AirflowException(
"Cannot get chat_id: " "No valid chat_id nor telegram_conn_id supplied."
)

@retry(exceptions=telegram.error.TelegramError, tries=5, delay=0)
def call(self, method, api_params):
"""
Send a message to a telegram channel

:param method: not used
:type method: str
:param api_params: params for telegram_instance.send_message. You can use it also to override chat_id
:type api_params: dict
"""

params = {
"chat_id": self.chat_id,
"parse_mode": telegram.ParseMode.HTML,
"disable_web_page_preview": True,
}
params.update(api_params)

self.log.info(self.connection.send_message(**params))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The full response should be at the debug level. At the info level, only the message that there is an attempt to send a message and that the message was sent.



__all__ = [TelegramHook]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
__all__ = [TelegramHook]
__all__ = ["TelegramHook"]

This should be a list of module names, not modules.

However, I would like to point out that we do not use the all variable in the whole project. I think that this is a good rule because it forces you to think about the whole project, not just small fragments. In the future, when Airflow will be divided into modules and interfaces between libraries will be important, we can think about changing this rule. Now we have one code base and we must think about its overall good.

Why is the decorator only in this module, not in the entire Airflow? What if another operator needs this decorator? Copy code? Move code? If we copy to another place, what will we do with the documentation? Can we change it? Did we respect code style of author?

@zhongjiajie What are you thinking?

56 changes: 56 additions & 0 deletions airflow/contrib/operators/telegram_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import telegram

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from hooks.telegram_hook import TelegramHook


class TelegramAPIOperator(BaseOperator):
"""
Telegram Operator

:param text: Templatable message
:type text: str
:param chat_id: Telegram channel ID
:type chat_id: str
:param telegram_conn_id: Telegram connection ID which its password is Telegram API token
:type telegram_conn_id: str
"""

template_fields = ("text", "chat_id")
ui_color = "#FFBA40"

@apply_defaults
def __init__(
self,
text="No message has been set.\n"
"Here is a cat video instead\n"
"https://www.youtube.com/watch?v=J---aiyznGQ",
chat_id=None,
telegram_conn_id=None,
dchaplinsky marked this conversation as resolved.
Show resolved Hide resolved
*args,
**kwargs
):
self.text = text
self.chat_id = chat_id

if telegram_conn_id is None:
raise AirflowException("No valid Telegram connection id supplied.")

self.telegram_conn_id = telegram_conn_id

super(TelegramAPIOperator, self).__init__(*args, **kwargs)

def execute(self, **kwargs):
"""
TelegramAPIOperator calls will not fail even if the call is not unsuccessful.
It should not prevent a DAG from completing in success
"""

try:
telegram_client = TelegramHook(telegram_conn_id=self.telegram_conn_id)
telegram_client.call("POST", {"text": self.text})
except Exception as e:
self.log.error("Cannot send a message to telegram, exception was '{}'".format(e))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exceptions should not be ignored.

dchaplinsky marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ Operators
.. autoclass:: airflow.contrib.operators.vertica_to_mysql.VerticaToMySqlTransfer
.. autoclass:: airflow.contrib.operators.wasb_delete_blob_operator.WasbDeleteBlobOperator
.. autoclass:: airflow.contrib.operators.winrm_operator.WinRMOperator
.. autoclass:: airflow.contrib.operators.telegram_operator.TelegramAPIOperator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the class in alphabetical order. First, according to class names, and then according to the module path.


Sensors
^^^^^^^
Expand Down Expand Up @@ -536,6 +537,7 @@ Community contributed hooks
.. autoclass:: airflow.contrib.hooks.vertica_hook.VerticaHook
.. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook
.. autoclass:: airflow.contrib.hooks.winrm_hook.WinRMHook
.. autoclass:: airflow.contrib.hooks.telegram_hook.TelegramHook
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the class in alphabetical order. First, according to class names, and then according to the module path.


Executors
---------
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
'vertica_python',
'winrm',
'zdesk',
'telegram',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, keep alphabetical order.

]

# Hack to allow changing for piece of the code to behave differently while
Expand Down