最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Apache Airflow Taskflow API: Pass XCOM value using JINJA2 into TriggerDagRunOperator conf - Stack Overflow

programmeradmin1浏览0评论

I am using Apache Airflow (2.2.3) taskflow API in conjunction with the TriggerDagRunOperator.

I am trying to dynamically pass a variable to the conf option of TriggerDagRunOperator using jinja. Please see simple example below.

@dag(dag_id='my_dag_trigger')

def first_taskflow():

    @task(multiple_outputs=True)
    def create_some_values():

        return {'v1': value1, 'v2', value2}

    @task
    def trigger_dag(**kwargs):

        TriggerDagRunOperator(
            task_id='my_second_dag',
            conf={
                'v1': "{{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}"
            }).execute(kwargs)

    create_some_values() >> trigger_dag()

first_taskflow()

# ------- DAG TO BE TRIGGERED -------

@dag(dag_id='my_second_dag')

def secondary_taskflow():

    @task()
    def secondary_task(**context):
        print(context['dag_run'].conf.get('v1'))

    secondary_task()

secondary_taskflow()

When the value is passed into the triggered DAG the jinja {{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }} statement is not returning the parameter value for v1 but returning "{{ task_instance.xcom_pull(task_ids='create_some_values', key='v1') }}" as the value.

Is there a way to pass values dynamically into the conf?

Any help greatly appreciated.

与本文相关的文章

发布评论

评论列表(0)

  1. 暂无评论