最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - In Airflow 2.10 can I use dynamic task mapping with BranchPythonOperator? - Stack Overflow

programmeradmin3浏览0评论

Below is a minimal implementation of a branch operator using the taskflow api.

The dag will execute either odd_task or even_task based on the string given by branch_on_condition. odd_task or even_task will also use the return_int value on which the branching decision was made. Then the final_task will be executed. All straight forward.

from airflow.decorators import dag, task


@dag(dag_id='Example_Dag_Simple_Branch')
def simple_dag():
    
    @task(task_id='return_int')
    def return_int():
        return 3


    @task.branch(task_id='branch_on_condition')
    def branch_on_condition(upstream_value):
        if upstream_value & 1:
            return 'odd_task'
        else:
            return 'even_task'
        
        
    @task(task_id='odd_task')
    def odd_task(input_val):
        print(f"{input_val} is an odd number")
        return input_val
    
    
    @task(task_id='even_task')
    def even_task(input_val):
        print(f"{input_val} is an even number")
        return input_val        


    @task(task_id='final_task', trigger_rule='one_success')
    def final_task():
        print('final task executed')
        return
    
    
    returned_int = return_int()
    branch_value = branch_on_condition(upstream_value=returned_int)
    even_task_return = even_task(input_val=returned_int)
    odd_task_return = odd_task(input_val=returned_int)
    final_return = final_task()
    
    branch_value >> [even_task_return, odd_task_return] >> final_return
    
simple_dag()

This logs either INFO - 3 is an odd number or INFO - 2 is an even number.

Why am I unable to implement a similar pattern using dynamically mapped tasks?

If the return_int task were to now return an n length list of integers return_list I can dynamically branch for this list fine, but BOTH branches are executed!

from airflow.decorators import dag, task


@dag(dag_id='Example_Dag_Dynamic_Branch')
def simple_dag():
    
    @task(task_id='return_list')
    def return_list():
        return [1,2,3,4,5,6,7]


    @task.branch(task_id='branch_on_condition')
    def branch_on_condition(upstream_value):
        if upstream_value & 1:
            return 'odd_task'
        else:
            return 'even_task'
        
        
    @task(task_id='odd_task')
    def odd_task(input_val):
        print(f"{input_val} is an odd number")
        return input_val
    
    
    @task(task_id='even_task')
    def even_task(input_val):
        print(f"{input_val} is an even number")
        return input_val        


    @task(task_id='final_task', trigger_rule='one_success')
    def final_task():
        print('final task executed')
        return
    
    
    returned_list = return_list()
    branch_value = branch_on_condition.expand(upstream_value=returned_list)
    even_task_return = even_task.expand(input_val=returned_list)
    odd_task_return = odd_task.expand(input_val=returned_list)
    final_return = final_task()
    
    branch_value >> [even_task_return, odd_task_return] >> final_return
    
simple_dag()

And so the output of the odd_task is for example 7 tasks all that say INFO - n is an odd number for n 1 through 7, which is of course not true.

I have tried

  • Packing the branch and downstream tasks into a task group.
  • Messing around with .expand() and .partial()
  • Reading related questions: 1. 2. 3.

None of these really answers the question!

Finally, I am aware that this behavior could literally just a be standalone if else block within an operator. But in reality the if else behavior could become quite complex and might be nice to separate into multiple tasks.

发布评论

评论列表(0)

  1. 暂无评论