Skip to content

Commit

Permalink
Add different modes to sort dag files for parsing (apache#15046)
Browse files Browse the repository at this point in the history
This commit adds the feature to allow users to set one of the following modes, the
 scheduler will list and sort the dag files to decide the parsing order.:

- `modified_time`: Sort by modified time of the files. This is useful on large scale to parse the recently modified DAGs first.
- `random_seeded_by_host`: Sort randomly across multiple Schedulers but with same order on the same host. This is useful when running with Scheduler in HA mode where each scheduler can parse different DAG files.
- `alphabetical`: Sort by filename
  • Loading branch information
kaxil authored Mar 29, 2021
1 parent 57388ef commit 2e3eb42
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 7 deletions.
16 changes: 16 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1834,6 +1834,22 @@
type: string
example: ~
default: "2"
- name: file_parsing_sort_mode
description: |
One of ``modified_time``, ``random_seeded_by_host`` and ``alphabetical``.
The scheduler will list and sort the dag files to decide the parsing order.
* ``modified_time``: Sort by modified time of the files. This is useful on large scale to parse the
recently modified DAGs first.
* ``random_seeded_by_host``: Sort randomly across multiple Schedulers but with same order on the
same host. This is useful when running with Scheduler in HA mode where each scheduler can
parse different DAG files.
* ``alphabetical``: Sort by filename
version_added: 2.1.0
type: string
example: ~
default: "modified_time"
- name: use_job_schedule
description: |
Turn off scheduler use of cron intervals by setting this to False.
Expand Down
11 changes: 11 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,17 @@ use_row_level_locking = True
# This defines how many processes will run.
parsing_processes = 2

# One of ``modified_time``, ``random_seeded_by_host`` and ``alphabetical``.
# The scheduler will list and sort the dag files to decide the parsing order.
#
# * ``modified_time``: Sort by modified time of the files. This is useful on large scale to parse the
# recently modified DAGs first.
# * ``random_seeded_by_host``: Sort randomly across multiple Schedulers but with same order on the
# same host. This is useful when running with Scheduler in HA mode where each scheduler can
# parse different DAG files.
# * ``alphabetical``: Sort by filename
file_parsing_sort_mode = modified_time

# Turn off scheduler use of cron intervals by setting this to False.
# DAGs submitted manually in the web UI or with trigger_dag will still run.
use_job_schedule = True
Expand Down
10 changes: 10 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,16 @@ def _validate_config_dependencies(self):
+ ", ".join(start_method_options)
)

if self.has_option("scheduler", "file_parsing_sort_mode"):
list_mode = self.get("scheduler", "file_parsing_sort_mode")
file_parser_modes = {"modified_time", "random_seeded_by_host", "alphabetical"}

if list_mode not in file_parser_modes:
raise AirflowConfigException(
"`[scheduler] file_parsing_sort_mode` should not be "
+ f"{list_mode}. Possible values are {', '.join(file_parser_modes)}."
)

def _using_old_value(self, old, current_value): # noqa
return old.search(current_value) is not None

Expand Down
40 changes: 35 additions & 5 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import logging
import multiprocessing
import os
import random
import signal
import sys
import time
Expand All @@ -48,6 +49,7 @@
from airflow.utils.file import list_py_file_paths
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.net import get_hostname
from airflow.utils.process_utils import kill_child_processes_by_pids, reap_process_group
from airflow.utils.session import provide_session
from airflow.utils.state import State
Expand Down Expand Up @@ -1017,26 +1019,54 @@ def prepare_file_path_queue(self):
# processed recently, wait until the next batch
file_paths_in_progress = self._processors.keys()
now = timezone.utcnow()

# Sort the file paths by the parsing order mode
list_mode = conf.get("scheduler", "file_parsing_sort_mode")

files_with_mtime = {}
file_paths = []
is_mtime_mode = list_mode == "modified_time"

file_paths_recently_processed = []
for file_path in self._file_paths:

if is_mtime_mode:
files_with_mtime[file_path] = os.path.getmtime(file_path)
else:
file_paths.append(file_path)

# Find file paths that were recently processed
last_finish_time = self.get_last_finish_time(file_path)
if (
last_finish_time is not None
and (now - last_finish_time).total_seconds() < self._file_process_interval
):
file_paths_recently_processed.append(file_path)

# Sort file paths via last modified time
if is_mtime_mode:
file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True)
elif list_mode == "alphabetical":
file_paths = sorted(file_paths)
elif list_mode == "random_seeded_by_host":
# Shuffle the list seeded by hostname so multiple schedulers can work on different
# set of files. Since we set the seed, the sort order will remain same per host
random.Random(get_hostname()).shuffle(file_paths)

files_paths_at_run_limit = [
file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
]

files_paths_to_queue = list(
set(self._file_paths)
- set(file_paths_in_progress)
- set(file_paths_recently_processed)
- set(files_paths_at_run_limit)
file_paths_to_exclude = set(file_paths_in_progress).union(
file_paths_recently_processed, files_paths_at_run_limit
)

# Do not convert the following list to set as set does not preserve the order
# and we need to maintain the order of file_paths for `[scheduler] file_parsing_sort_mode`
files_paths_to_queue = [
file_path for file_path in file_paths if file_path not in file_paths_to_exclude
]

for file_path, processor in self._processors.items():
self.log.debug(
"File path %s is still being processed (started: %s)",
Expand Down
8 changes: 6 additions & 2 deletions airflow/utils/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def list_py_file_paths(
elif os.path.isfile(directory):
file_paths = [directory]
elif os.path.isdir(directory):
find_dag_file_paths(directory, file_paths, safe_mode)
file_paths.extend(find_dag_file_paths(directory, safe_mode))
if include_examples:
from airflow import example_dags

Expand All @@ -175,8 +175,10 @@ def list_py_file_paths(
return file_paths


def find_dag_file_paths(directory: str, file_paths: list, safe_mode: bool):
def find_dag_file_paths(directory: str, safe_mode: bool) -> List[str]:
"""Finds file paths of all DAG files."""
file_paths = []

for file_path in find_path_from_directory(directory, ".airflowignore"):
try:
if not os.path.isfile(file_path):
Expand All @@ -191,6 +193,8 @@ def find_dag_file_paths(directory: str, file_paths: list, safe_mode: bool):
except Exception: # noqa pylint: disable=broad-except
log.exception("Error while examining %s", file_path)

return file_paths


COMMENT_PATTERN = re.compile(r"\s*#.*")

Expand Down
97 changes: 97 additions & 0 deletions tests/utils/test_dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import multiprocessing
import os
import random
import sys
import unittest
from datetime import datetime, timedelta
Expand All @@ -42,6 +43,7 @@
DagParsingSignal,
DagParsingStat,
)
from airflow.utils.net import get_hostname
from airflow.utils.session import create_session
from airflow.utils.state import State
from tests.core.test_logging_config import SETTINGS_FILE_VALID, settings_context
Expand Down Expand Up @@ -223,6 +225,101 @@ def test_set_file_paths_when_processor_file_path_is_in_new_file_paths(self):
manager.set_file_paths(['abc.txt'])
assert manager._processors == {'abc.txt': mock_processor}

@conf_vars({("scheduler", "file_parsing_sort_mode"): "alphabetical"})
@mock.patch("zipfile.is_zipfile", return_value=True)
@mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
@mock.patch("airflow.utils.file.find_path_from_directory", return_value=True)
@mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
def test_file_paths_in_queue_sorted_alphabetically(
self, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile
):
"""Test dag files are sorted alphabetically"""
dag_files = ["file_3.py", "file_2.py", "file_4.py", "file_1.py"]
mock_find_path.return_value = dag_files

manager = DagFileProcessorManager(
dag_directory='directory',
max_runs=1,
processor_factory=MagicMock().return_value,
processor_timeout=timedelta.max,
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
)

manager.set_file_paths(dag_files)
assert manager._file_path_queue == []
manager.prepare_file_path_queue()
assert manager._file_path_queue == ['file_1.py', 'file_2.py', 'file_3.py', 'file_4.py']

@conf_vars({("scheduler", "file_parsing_sort_mode"): "random_seeded_by_host"})
@mock.patch("zipfile.is_zipfile", return_value=True)
@mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
@mock.patch("airflow.utils.file.find_path_from_directory", return_value=True)
@mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
def test_file_paths_in_queue_sorted_random_seeded_by_host(
self, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile
):
"""Test files are randomly sorted and seeded by host name"""
dag_files = ["file_3.py", "file_2.py", "file_4.py", "file_1.py"]
mock_find_path.return_value = dag_files

manager = DagFileProcessorManager(
dag_directory='directory',
max_runs=1,
processor_factory=MagicMock().return_value,
processor_timeout=timedelta.max,
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
)

manager.set_file_paths(dag_files)
assert manager._file_path_queue == []
manager.prepare_file_path_queue()

expected_order = dag_files
random.Random(get_hostname()).shuffle(expected_order)
assert manager._file_path_queue == expected_order

# Verify running it again produces same order
manager._file_paths = []
manager.prepare_file_path_queue()
assert manager._file_path_queue == expected_order

@conf_vars({("scheduler", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("zipfile.is_zipfile", return_value=True)
@mock.patch("airflow.utils.file.might_contain_dag", return_value=True)
@mock.patch("airflow.utils.file.find_path_from_directory", return_value=True)
@mock.patch("airflow.utils.file.os.path.isfile", return_value=True)
@mock.patch("airflow.utils.file.os.path.getmtime")
def test_file_paths_in_queue_sorted_by_modified_time(
self, mock_getmtime, mock_isfile, mock_find_path, mock_might_contain_dag, mock_zipfile
):
"""Test files are sorted by modified time"""
paths_with_mtime = {"file_3.py": 3.0, "file_2.py": 2.0, "file_4.py": 5.0, "file_1.py": 4.0}
dag_files = list(paths_with_mtime.keys())
mock_getmtime.side_effect = list(paths_with_mtime.values())
mock_find_path.return_value = dag_files

manager = DagFileProcessorManager(
dag_directory='directory',
max_runs=1,
processor_factory=MagicMock().return_value,
processor_timeout=timedelta.max,
signal_conn=MagicMock(),
dag_ids=[],
pickle_dags=False,
async_mode=True,
)

manager.set_file_paths(dag_files)
assert manager._file_path_queue == []
manager.prepare_file_path_queue()
assert manager._file_path_queue == ['file_4.py', 'file_1.py', 'file_3.py', 'file_2.py']

def test_find_zombies(self):
manager = DagFileProcessorManager(
dag_directory='directory',
Expand Down

0 comments on commit 2e3eb42

Please sign in to comment.