From def961512904443db90e0a980c43dc4d8f8328d5 Mon Sep 17 00:00:00 2001 From: Benoit Person Date: Mon, 29 Mar 2021 11:13:27 +0000 Subject: [PATCH] Scheduler: Remove TIs from starved pools from the critical path. (#14476) Co-authored-by: Ash Berlin-Taylor --- airflow/jobs/scheduler_job.py | 6 ++- tests/jobs/test_scheduler_job.py | 75 ++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 3b801811718fb..9c60c51b1a30e 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -924,8 +924,12 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = .filter(not_(DM.is_paused)) .filter(TI.state == State.SCHEDULED) .options(selectinload('dag_model')) - .limit(max_tis) ) + starved_pools = [pool_name for pool_name, stats in pools.items() if stats['open'] <= 0] + if starved_pools: + query = query.filter(not_(TI.pool.in_(starved_pools))) + + query = query.limit(max_tis) task_instances_to_examine: List[TI] = with_row_locks( query, diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index be8e1ac168b17..270e1497dc4aa 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2670,6 +2670,81 @@ def test_scheduler_verify_pool_full_2_slots_per_task(self): # As tasks require 2 slots, only 3 can fit into 6 available assert len(task_instances_list) == 3 + def test_scheduler_keeps_scheduling_pool_full(self): + """ + Test task instances in a pool that isn't full keep getting scheduled even when a pool is full. + """ + dag_d1 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d1', start_date=DEFAULT_DATE) + BashOperator( + task_id='test_scheduler_keeps_scheduling_pool_full_t1', + dag=dag_d1, + owner='airflow', + pool='test_scheduler_keeps_scheduling_pool_full_p1', + bash_command='echo hi', + ) + + dag_d2 = DAG(dag_id='test_scheduler_keeps_scheduling_pool_full_d2', start_date=DEFAULT_DATE) + BashOperator( + task_id='test_scheduler_keeps_scheduling_pool_full_t2', + dag=dag_d2, + owner='airflow', + pool='test_scheduler_keeps_scheduling_pool_full_p2', + bash_command='echo hi', + ) + dagbag = DagBag( + dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), + include_examples=False, + read_dags_from_db=True, + ) + dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1) + dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2) + dagbag.sync_to_db() + + session = settings.Session() + pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p1', slots=1) + pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p2', slots=10) + session.add(pool_p1) + session.add(pool_p2) + session.commit() + + dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1)) + + scheduler = SchedulerJob(executor=self.null_exec) + scheduler.processor_agent = mock.MagicMock() + + # Create 5 dagruns for each DAG. + # To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all + # TIs from the first dag first. + date = DEFAULT_DATE + for _ in range(5): + dr = dag_d1.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=date, + state=State.RUNNING, + ) + scheduler._schedule_dag_run(dr, {}, session) + date = dag_d1.following_schedule(date) + + date = DEFAULT_DATE + for _ in range(5): + dr = dag_d2.create_dagrun( + run_type=DagRunType.SCHEDULED, + execution_date=date, + state=State.RUNNING, + ) + scheduler._schedule_dag_run(dr, {}, session) + date = dag_d2.following_schedule(date) + + scheduler._executable_task_instances_to_queued(max_tis=2, session=session) + task_instances_list2 = scheduler._executable_task_instances_to_queued(max_tis=2, session=session) + + # Make sure we get TIs from a non-full pool in the 2nd list + assert len(task_instances_list2) > 0 + assert all( + task_instance.pool != 'test_scheduler_keeps_scheduling_pool_full_p1' + for task_instance in task_instances_list2 + ) + def test_scheduler_verify_priority_and_slots(self): """ Test task instances with higher priority are not queued