-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIRFLOW-4067] Telegram hook/operator to post messages to telegram channels #4891
Changes from 4 commits
aadaa11
a2278d2
af5a4b2
f3aa9a5
f05bb92
3213991
e47082a
54e23f7
ca5e9a0
7657d5d
25e59d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,117 @@ | ||||||
import telegram | ||||||
import time | ||||||
from functools import wraps | ||||||
from airflow.hooks.base_hook import BaseHook | ||||||
from airflow.exceptions import AirflowException | ||||||
|
||||||
|
||||||
def retry(exceptions, tries=4, delay=3, backoff=2, logger=None): | ||||||
""" | ||||||
Retry calling the decorated function using an exponential backoff. | ||||||
(с) Eliot aka saltycrane, https://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/ | ||||||
|
||||||
Args: | ||||||
exceptions: The exception to check. may be a tuple of | ||||||
exceptions to check. | ||||||
tries: Number of times to try (not retry) before giving up. | ||||||
delay: Initial delay between retries in seconds. | ||||||
backoff: Backoff multiplier (e.g. value of 2 will double the delay | ||||||
each retry). | ||||||
logger: Logger to use. If None, print. | ||||||
""" | ||||||
|
||||||
def deco_retry(f): | ||||||
@wraps(f) | ||||||
def f_retry(*args, **kwargs): | ||||||
mtries, mdelay = tries, delay | ||||||
while mtries > 1: | ||||||
try: | ||||||
return f(*args, **kwargs) | ||||||
except exceptions as e: | ||||||
msg = "{}, Retrying in {} seconds...".format(e, mdelay) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should avoid formatting string before passing to logger. |
||||||
if logger: | ||||||
logger.warning(msg) | ||||||
else: | ||||||
print(msg) | ||||||
time.sleep(mdelay) | ||||||
mtries -= 1 | ||||||
mdelay *= backoff | ||||||
return f(*args, **kwargs) | ||||||
|
||||||
return f_retry # true decorator | ||||||
|
||||||
return deco_retry | ||||||
|
||||||
|
||||||
class TelegramHook(BaseHook): | ||||||
""" | ||||||
Interact with Telegram, using python-telegram-bot library. | ||||||
""" | ||||||
|
||||||
def __init__(self, telegram_conn_id=None, chat_id=None): | ||||||
""" | ||||||
Takes both telegram bot API token directly and connection that has telegram bot API token. | ||||||
If both supplied, telegram API token will be used. | ||||||
dchaplinsky marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
:param telegram_conn_id: connection that has telegram API token in the password field | ||||||
:type telegram_conn_id: str | ||||||
:param chat_id: Telegram public or private channel id (optional). | ||||||
Check https://stackoverflow.com/a/33862907 to see how you can obtain chat_id for private | ||||||
dchaplinsky marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
channel | ||||||
:type chat_id: str | ||||||
""" | ||||||
self.token = self.__get_token(telegram_conn_id) | ||||||
self.chat_id = self.__get_chat_id(chat_id, telegram_conn_id) | ||||||
self.connection = self.get_conn() | ||||||
|
||||||
def get_conn(self): | ||||||
return telegram.Bot(token=self.token) | ||||||
|
||||||
def __get_token(self, telegram_conn_id): | ||||||
if telegram_conn_id is not None: | ||||||
conn = self.get_connection(telegram_conn_id) | ||||||
|
||||||
if not conn.password: | ||||||
raise AirflowException("Missing token(password) in Telegram connection") | ||||||
return conn.password | ||||||
else: | ||||||
raise AirflowException( | ||||||
"Cannot get token: " "No valid Telegram connection supplied." | ||||||
) | ||||||
|
||||||
def __get_chat_id(self, chat_id, telegram_conn_id): | ||||||
if chat_id is not None: | ||||||
return chat_id | ||||||
elif telegram_conn_id is not None: | ||||||
conn = self.get_connection(telegram_conn_id) | ||||||
|
||||||
if not getattr(conn, "host", None): | ||||||
raise AirflowException("Missing chat_id (host) in Telegram connection") | ||||||
return conn.host | ||||||
else: | ||||||
raise AirflowException( | ||||||
"Cannot get chat_id: " "No valid chat_id nor telegram_conn_id supplied." | ||||||
) | ||||||
|
||||||
@retry(exceptions=telegram.error.TelegramError, tries=5, delay=0) | ||||||
def call(self, method, api_params): | ||||||
""" | ||||||
Send a message to a telegram channel | ||||||
|
||||||
:param method: not used | ||||||
:type method: str | ||||||
:param api_params: params for telegram_instance.send_message. You can use it also to override chat_id | ||||||
:type api_params: dict | ||||||
""" | ||||||
|
||||||
params = { | ||||||
"chat_id": self.chat_id, | ||||||
"parse_mode": telegram.ParseMode.HTML, | ||||||
"disable_web_page_preview": True, | ||||||
} | ||||||
params.update(api_params) | ||||||
|
||||||
self.log.info(self.connection.send_message(**params)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The full response should be at the debug level. At the info level, only the message that there is an attempt to send a message and that the message was sent. |
||||||
|
||||||
|
||||||
__all__ = [TelegramHook] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
This should be a list of module names, not modules. However, I would like to point out that we do not use the all variable in the whole project. I think that this is a good rule because it forces you to think about the whole project, not just small fragments. In the future, when Airflow will be divided into modules and interfaces between libraries will be important, we can think about changing this rule. Now we have one code base and we must think about its overall good. Why is the decorator only in this module, not in the entire Airflow? What if another operator needs this decorator? Copy code? Move code? If we copy to another place, what will we do with the documentation? Can we change it? Did we respect code style of author? @zhongjiajie What are you thinking? |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import json | ||
import telegram | ||
|
||
from airflow.models import BaseOperator | ||
from airflow.utils.decorators import apply_defaults | ||
from airflow.exceptions import AirflowException | ||
from hooks.telegram_hook import TelegramHook | ||
|
||
|
||
class TelegramAPIOperator(BaseOperator): | ||
""" | ||
Telegram Operator | ||
|
||
:param text: Templatable message | ||
:type text: str | ||
:param chat_id: Telegram channel ID | ||
:type chat_id: str | ||
:param telegram_conn_id: Telegram connection ID which its password is Telegram API token | ||
:type telegram_conn_id: str | ||
""" | ||
|
||
template_fields = ("text", "chat_id") | ||
ui_color = "#FFBA40" | ||
|
||
@apply_defaults | ||
def __init__( | ||
self, | ||
text="No message has been set.\n" | ||
"Here is a cat video instead\n" | ||
"https://www.youtube.com/watch?v=J---aiyznGQ", | ||
chat_id=None, | ||
telegram_conn_id=None, | ||
dchaplinsky marked this conversation as resolved.
Show resolved
Hide resolved
|
||
*args, | ||
**kwargs | ||
): | ||
self.text = text | ||
self.chat_id = chat_id | ||
|
||
if telegram_conn_id is None: | ||
raise AirflowException("No valid Telegram connection id supplied.") | ||
|
||
self.telegram_conn_id = telegram_conn_id | ||
|
||
super(TelegramAPIOperator, self).__init__(*args, **kwargs) | ||
|
||
def execute(self, **kwargs): | ||
""" | ||
TelegramAPIOperator calls will not fail even if the call is not unsuccessful. | ||
It should not prevent a DAG from completing in success | ||
""" | ||
|
||
try: | ||
telegram_client = TelegramHook(telegram_conn_id=self.telegram_conn_id) | ||
telegram_client.call("POST", {"text": self.text}) | ||
except Exception as e: | ||
self.log.error("Cannot send a message to telegram, exception was '{}'".format(e)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exceptions should not be ignored.
dchaplinsky marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -278,6 +278,7 @@ Operators | |
.. autoclass:: airflow.contrib.operators.vertica_to_mysql.VerticaToMySqlTransfer | ||
.. autoclass:: airflow.contrib.operators.wasb_delete_blob_operator.WasbDeleteBlobOperator | ||
.. autoclass:: airflow.contrib.operators.winrm_operator.WinRMOperator | ||
.. autoclass:: airflow.contrib.operators.telegram_operator.TelegramAPIOperator | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add the class in alphabetical order. First, according to class names, and then according to the module path. |
||
|
||
Sensors | ||
^^^^^^^ | ||
|
@@ -536,6 +537,7 @@ Community contributed hooks | |
.. autoclass:: airflow.contrib.hooks.vertica_hook.VerticaHook | ||
.. autoclass:: airflow.contrib.hooks.wasb_hook.WasbHook | ||
.. autoclass:: airflow.contrib.hooks.winrm_hook.WinRMHook | ||
.. autoclass:: airflow.contrib.hooks.telegram_hook.TelegramHook | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add the class in alphabetical order. First, according to class names, and then according to the module path. |
||
|
||
Executors | ||
--------- | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,6 +88,7 @@ | |
'vertica_python', | ||
'winrm', | ||
'zdesk', | ||
'telegram', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please, keep alphabetical order. |
||
] | ||
|
||
# Hack to allow changing for piece of the code to behave differently while | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use a reStructuredText docstring style.