I am using Apache Airflow (2.2.3) taskflow API in conjunction with the TriggerDagRunOperator.
I am trying to dynamically pass a variable to the conf option of TriggerDagRunOperator using jinja. Please see simple example below.
@dag(dag_id='my_dag_trigger')
def first_taskflow():
@task(multiple_outputs=True)
def create_some_values():
return {'v1': value1, 'v2', value2}
@task
def trigger_dag(**kwargs):
TriggerDagRunOperator(
task_id='my_second_dag',
conf={
'v1': "{{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}"
}).execute(kwargs)
create_some_values() >> trigger_dag()
first_taskflow()
# ------- DAG TO BE TRIGGERED -------
@dag(dag_id='my_second_dag')
def secondary_taskflow():
@task()
def secondary_task(**context):
print(context['dag_run'].conf.get('v1'))
secondary_task()
secondary_taskflow()
When the value is passed into the triggered DAG the jinja {{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}
statement is not returning the parameter value for v1 but returning "{{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}" as the value.
Is there a way to pass values dynamically into the conf?
Any help greatly appreciated.