最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

airflow - Trigger a task iff direct upstream failed - Stack Overflow

programmeradmin1浏览0评论

Let's say I have a dag with multiple tasks

task1 >> task2 >> [task3, task4]

I want task1 to be executed at the beginning.

Then task2.

If task2 ends with success, then execute task3, else execute task4.

I've tried to set the TriggerRule all_failed to task4 and, if task2 fails/ends with success everything works as desired.

However if task1 fails, task2 and task3 are marked as upstream_failed, BUT task4 is executed.

According to .9.3/core-concepts/dags.html#trigger-rules this is "the expected behavior" ("all_failed: All upstream tasks are in a failed or upstream_failed state") but then it seems to me there is no way to set a trigger rule such as "execute task4 only if upstream is failed (and only failed)"

How can I achieve that?

Let's say I have a dag with multiple tasks

task1 >> task2 >> [task3, task4]

I want task1 to be executed at the beginning.

Then task2.

If task2 ends with success, then execute task3, else execute task4.

I've tried to set the TriggerRule all_failed to task4 and, if task2 fails/ends with success everything works as desired.

However if task1 fails, task2 and task3 are marked as upstream_failed, BUT task4 is executed.

According to https://airflow.apache./docs/apache-airflow/2.9.3/core-concepts/dags.html#trigger-rules this is "the expected behavior" ("all_failed: All upstream tasks are in a failed or upstream_failed state") but then it seems to me there is no way to set a trigger rule such as "execute task4 only if upstream is failed (and only failed)"

How can I achieve that?

Share asked Feb 10 at 16:35 Vito De TullioVito De Tullio 1,6854 gold badges35 silver badges59 bronze badges
Add a comment  | 

3 Answers 3

Reset to default 1 +450

The issue arises because all_failed considers both failed and upstream_failed states, which means Task4 runs even if Task1 fails. Airflow does not have a built-in "only direct upstream failed" rule, but you can use BranchPythonOperator to dynamically branch based on Task2's outcome.

...
task1 = DummyOperator(task_id="task1", dag=dag)

task2 = PythonOperator(
    task_id='task2',
    python_callable=task2_logic,
    provide_context=True,
    dag=dag
)

branch = BranchPythonOperator(
    task_id='branching',
    python_callable=branch_task,
    provide_context=True,
    dag=dag
)

task3 = DummyOperator(task_id="task3", dag=dag)
task4 = DummyOperator(task_id="task4", dag=dag)

task1 >> task2 >> branch
branch >> [task3, task4]

I think you can use ShortCircuitOperator between task1 and task2 in this case which allows you to skip all downstream tasks including task3 and task4 if its python_callable return False (task1 state is FAILED in below code)

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from airflow.utils.state import State
from airflow.utils.trigger_rule import TriggerRule


def validate(**context):
    task = context['dag_run'].get_task_instance('task1')
    return not(task.state == State.FAILED)

dag = DAG(
    dag_id="dag",
    start_date=datetime(2025, 2, 12),
    schedule_interval='@once',
)

with dag:
    task1 = PythonOperator(
        task_id='task1',
        python_callable=lambda: True
    )

    short_circuit = ShortCircuitOperator(
        task_id='short_circuit',
        python_callable=validate,
        trigger_rule=TriggerRule.ALL_DONE
    )

    task2 = PythonOperator(
        task_id='task2',
        python_callable=lambda: False 
    )

    task1 >> short_ciruit >> task2 # >> [task3, task4]

You'll need to modify the task dependencies and trigger rules to achieve the behavior you want. Add a start task as an anchor point using EmptyOperator, then introduce two additional empty operators (task2_success and task2_failure) that act as gates:

  • task2_success only proceeds if task2 succeeds

  • task2_failure only proceeds if task2 fails

Both task3 and task4 should use ALL_SUCCESS trigger rule, but depend on different gates:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime

def task1_function():
    # Your task1 logic here
    pass

def task2_function():
    # Your task2 logic here
    pass

def task3_function():
    # Your task3 logic here
    pass

def task4_function():
    # Your task4 logic here
    pass

with DAG(
    'conditional_execution_dag',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None
) as dag:

    # Create a starting point
    start = EmptyOperator(task_id='start')

    # Main tasks
    task1 = PythonOperator(
        task_id='task1',
        python_callable=task1_function
    )

    task2 = PythonOperator(
        task_id='task2',
        python_callable=task2_function
    )

    # Create branching conditions using EmptyOperator
    task2_success = EmptyOperator(
        task_id='task2_success',
        trigger_rule=TriggerRule.ALL_SUCCESS
    )

    task2_failure = EmptyOperator(
        task_id='task2_failure',
        trigger_rule=TriggerRule.ONE_FAILED
    )

    task3 = PythonOperator(
        task_id='task3',
        python_callable=task3_function,
        trigger_rule=TriggerRule.ALL_SUCCESS
    )

    task4 = PythonOperator(
        task_id='task4',
        python_callable=task4_function,
        trigger_rule=TriggerRule.ALL_SUCCESS
    )

    # Set dependencies
    start >> task1 >> task2
    task2 >> [task2_success, task2_failure]
    task2_success >> task3
    task2_failure >> task4
  • If task1 fails, everything downstream is marked as upstream_failed and nothing else executes
  • If task1 succeeds but task2 fails, only the task2_failure gate passes, leading to task4
  • If both task1 and task2 succeed, only the task2_success gate passes, leading to task3
发布评论

评论列表(0)

  1. 暂无评论
ok 不同模板 switch ($forum['model']) { /*case '0': include _include(APP_PATH . 'view/htm/read.htm'); break;*/ default: include _include(theme_load('read', $fid)); break; } } break; case '10': // 主题外链 / thread external link http_location(htmlspecialchars_decode(trim($thread['description']))); break; case '11': // 单页 / single page $attachlist = array(); $imagelist = array(); $thread['filelist'] = array(); $threadlist = NULL; $thread['files'] > 0 and list($attachlist, $imagelist, $thread['filelist']) = well_attach_find_by_tid($tid); $data = data_read_cache($tid); empty($data) and message(-1, lang('data_malformation')); $tidlist = $forum['threads'] ? page_find_by_fid($fid, $page, $pagesize) : NULL; if ($tidlist) { $tidarr = arrlist_values($tidlist, 'tid'); $threadlist = well_thread_find($tidarr, $pagesize); // 按之前tidlist排序 $threadlist = array2_sort_key($threadlist, $tidlist, 'tid'); } $allowpost = forum_access_user($fid, $gid, 'allowpost'); $allowupdate = forum_access_mod($fid, $gid, 'allowupdate'); $allowdelete = forum_access_mod($fid, $gid, 'allowdelete'); $access = array('allowpost' => $allowpost, 'allowupdate' => $allowupdate, 'allowdelete' => $allowdelete); $header['title'] = $thread['subject']; $header['mobile_link'] = $thread['url']; $header['keywords'] = $thread['keyword'] ? $thread['keyword'] : $thread['subject']; $header['description'] = $thread['description'] ? $thread['description'] : $thread['brief']; $_SESSION['fid'] = $fid; if ($ajax) { empty($conf['api_on']) and message(0, lang('closed')); $apilist['header'] = $header; $apilist['extra'] = $extra; $apilist['access'] = $access; $apilist['thread'] = well_thread_safe_info($thread); $apilist['thread_data'] = $data; $apilist['forum'] = $forum; $apilist['imagelist'] = $imagelist; $apilist['filelist'] = $thread['filelist']; $apilist['threadlist'] = $threadlist; message(0, $apilist); } else { include _include(theme_load('single_page', $fid)); } break; default: message(-1, lang('data_malformation')); break; } ?>