From 375d26d880338e4d86b576df466f70183085d57f Mon Sep 17 00:00:00 2001 From: Leon Smith Date: Tue, 30 Mar 2021 13:55:00 +0100 Subject: [PATCH] Pass queue to BaseExecutor.execute_async like in airflow 1.10 (#14861) Any schedulers depending on the queue functionality that haven't overridden `trigger_tasks` method will see queue functionality break when upgrading to 2.0 --- airflow/executors/base_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index fc753055194ea..8e87901115338 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -182,10 +182,10 @@ def trigger_tasks(self, open_slots: int) -> None: sorted_queue = self.order_queued_tasks_by_priority() for _ in range(min((open_slots, len(self.queued_tasks)))): - key, (command, _, _, ti) = sorted_queue.pop(0) + key, (command, _, queue, ti) = sorted_queue.pop(0) self.queued_tasks.pop(key) self.running.add(key) - self.execute_async(key=key, command=command, queue=None, executor_config=ti.executor_config) + self.execute_async(key=key, command=command, queue=queue, executor_config=ti.executor_config) def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None: """