Skip to content

Commit

Permalink
Pass queue to BaseExecutor.execute_async like in airflow 1.10 (apache…
Browse files Browse the repository at this point in the history
…#14861)

Any schedulers depending on the queue functionality that haven't overridden
`trigger_tasks` method will see queue functionality break when upgrading to 2.0
  • Loading branch information
leonsmith authored Mar 30, 2021
1 parent 78d9feb commit 375d26d
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ def trigger_tasks(self, open_slots: int) -> None:
sorted_queue = self.order_queued_tasks_by_priority()

for _ in range(min((open_slots, len(self.queued_tasks)))):
key, (command, _, _, ti) = sorted_queue.pop(0)
key, (command, _, queue, ti) = sorted_queue.pop(0)
self.queued_tasks.pop(key)
self.running.add(key)
self.execute_async(key=key, command=command, queue=None, executor_config=ti.executor_config)
self.execute_async(key=key, command=command, queue=queue, executor_config=ti.executor_config)

def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None:
"""
Expand Down

0 comments on commit 375d26d

Please sign in to comment.