I have a pretty simple DAG which has some primary operational tasks, and then some cleanup and reporting tasks. I would like to get the runtime of the operational bits either by the difference of the start time of the first task and the end time of the last operational one, or if I can do this by putting them in a task group, that would be even easier.
I'm not clear how to do this though. My tasks are built on custom operators built from the base Airflow operator class.
My ideal would be
def get_time_delta(start_task, end_task) -> int:
"""Determine time delta in seconds between two airflow tasks."""
# This is the bit I don't know where to find
return end_task.dt - start_task.dt
@dag(...)
def my_dag():
task1 = MyCustomOperator1(...)
task2 = MyCustomOperator2(...)
task3 = MyCustomEmitMetricOperator(runtime=get_time_delta(task1, task2))
task1 >> task2 >> task3
Any support on how to achieve this would be greatly appreciated.