最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

airflow - How to pass a variable from a function to dependent dag tasks - Stack Overflow

programmeradmin4浏览0评论

I am trying to write a function as below that returns a date :-

def date_fn():
d = datetime.today() - timedelta(days=days_to_subtract)
return d

And then want to pass on this "d" to my dag and its dependent dag:

 #constants
 SCHEDULE= "0 8 * * 4"
 with DAG(
   dag_id = "processthe_files"
   start_date = datetime(2024, 10, 8),
   schedule_interval = SCHEDULE
   ) as directed_acyclic_graph:
   file_processing =  Job(
   #this  calls a python callable which runs a databricks job 
   ).to_task

  trigger_processtables = TriggerDagRunOperator(
  task_id='trigger_processtables',
  trigger_dag_id='processtables',
  wait_for_completion=True,
  dag=dag
  )
 processthefiles >> trigger_processtables 


  with DAG(
 dag_id = "processthe_tables"
 start_date = datetime(2024, 10, 8),
 schedule_interval = None
 ) as directed_acyclic_graph:
 processtables =  Job(
  #this  calls a python callable which runs a databricks job 
  ).to_task

 processtables

I want to pass on the value of d as a variable to all the tasks in the dag processthe_files tasks and also to the dependent dag. I looked for xcom concept but unable to understand it fully. Can anyone please help how to achieve this?

I am trying to write a function as below that returns a date :-

def date_fn():
d = datetime.today() - timedelta(days=days_to_subtract)
return d

And then want to pass on this "d" to my dag and its dependent dag:

 #constants
 SCHEDULE= "0 8 * * 4"
 with DAG(
   dag_id = "processthe_files"
   start_date = datetime(2024, 10, 8),
   schedule_interval = SCHEDULE
   ) as directed_acyclic_graph:
   file_processing =  Job(
   #this  calls a python callable which runs a databricks job 
   ).to_task

  trigger_processtables = TriggerDagRunOperator(
  task_id='trigger_processtables',
  trigger_dag_id='processtables',
  wait_for_completion=True,
  dag=dag
  )
 processthefiles >> trigger_processtables 


  with DAG(
 dag_id = "processthe_tables"
 start_date = datetime(2024, 10, 8),
 schedule_interval = None
 ) as directed_acyclic_graph:
 processtables =  Job(
  #this  calls a python callable which runs a databricks job 
  ).to_task

 processtables

I want to pass on the value of d as a variable to all the tasks in the dag processthe_files tasks and also to the dependent dag. I looked for xcom concept but unable to understand it fully. Can anyone please help how to achieve this?

Share Improve this question asked Mar 20 at 12:49 AviatorAviator 7401 gold badge9 silver badges19 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

Airlfow provides the solution to your use case. You don't have to calculate the time in your explicitly.

You can use Airflow's template variables, variables like ds, ds_nodash can be used and date could be subtracted while using those variables.

https://airflow.apache./docs/apache-airflow/stable/templates-ref.html

发布评论

评论列表(0)

  1. 暂无评论