Skip to content

Commit

Permalink
Scheduler: Remove TIs from starved pools from the critical path. (apa…
Browse files Browse the repository at this point in the history
…che#14476)

Co-authored-by: Ash Berlin-Taylor <ash@apache.org>
  • Loading branch information
bperson and ashb authored Mar 29, 2021
1 parent e7eb449 commit def9615
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 1 deletion.
6 changes: 5 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,12 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
.filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
.options(selectinload('dag_model'))
.limit(max_tis)
)
starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0]
if starved_pools:
query = query.filter(not_(TI.pool.in_(starved_pools)))

query = query.limit(max_tis)

task_instances_to_examine: List[TI] = with_row_locks(
query,
Expand Down
75 changes: 75 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2670,6 +2670,81 @@ def test_scheduler_verify_pool_full_2_slots_per_task(self):
# As tasks require 2 slots, only 3 can fit into 6 available
assert len(task_instances_list) == 3

def test_scheduler_keeps_scheduling_pool_full(self):
"""
Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
"""
dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d1', start_date=DEFAULT_DATE)
BashOperator(
task_id='test_scheduler_keeps_scheduling_pool_full_t1',
dag=dag_d1,
owner='airflow',
pool='test_scheduler_keeps_scheduling_pool_full_p1',
bash_command='echo hi',
)

dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d2', start_date=DEFAULT_DATE)
BashOperator(
task_id='test_scheduler_keeps_scheduling_pool_full_t2',
dag=dag_d2,
owner='airflow',
pool='test_scheduler_keeps_scheduling_pool_full_p2',
bash_command='echo hi',
)
dagbag = DagBag(
dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
include_examples=False,
read_dags_from_db=True,
)
dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
dagbag.sync_to_db()

session = settings.Session()
pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p1', slots=1)
pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p2', slots=10)
session.add(pool_p1)
session.add(pool_p2)
session.commit()

dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))

scheduler = SchedulerJob(executor=self.null_exec)
scheduler.processor_agent = mock.MagicMock()

# Create 5 dagruns for each DAG.
# To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all
# TIs from the first dag first.
date = DEFAULT_DATE
for _ in range(5):
dr = dag_d1.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=date,
state=State.RUNNING,
)
scheduler._schedule_dag_run(dr, {}, session)
date = dag_d1.following_schedule(date)

date = DEFAULT_DATE
for _ in range(5):
dr = dag_d2.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=date,
state=State.RUNNING,
)
scheduler._schedule_dag_run(dr, {}, session)
date = dag_d2.following_schedule(date)

scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
task_instances_list2 = scheduler._executable_task_instances_to_queued(max_tis=2, session=session)

# Make sure we get TIs from a non-full pool in the 2nd list
assert len(task_instances_list2) > 0
assert all(
task_instance.pool != 'test_scheduler_keeps_scheduling_pool_full_p1'
for task_instance in task_instances_list2
)

def test_scheduler_verify_priority_and_slots(self):
"""
Test task instances with higher priority are not queued
Expand Down

0 comments on commit def9615

Please sign in to comment.