I have an Airflow DAG with six tasks, all using GKEPodOperator. My DAG has an execution_timeout of 1 hour, but sometimes Task 5 takes longer to execute, causing failures.
I want to set a dynamic execution_timeout for Task 5 using the following logic: • DAG Start Time + 60 minutes (total execution window) • Subtract the end time of Task 4 (the task prior to Task 5) • Leave a 2-minute buffer for Task 6 to complete
I tried retrieving the DAG start time and Task 4’s end time using XCom and dag_run.get_task_instance(), but it is failing inside GKEPodOperator.
Is there a way to dynamically compute and apply execution_timeout within Task 5 itself, without creating an extra task?
Any suggestions on achieving this in Airflow?