最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

global - How Can I assign schedule_interval dynamically to an Airflow DAG - Stack Overflow

programmeradmin3浏览0评论

I am creating the dag dynamically using the metadata that is stored in table.

I am reading the table and creating a dictionary that looks like below.

This is sample data and The table has other columns as well.

metadata = [
    {'source_system': 'src1', 'table_name': 'table1', 'dag_group_number': 1, 'job_schedule_cron': '0 5 * * *'},
    {'source_system': 'src1', 'table_name': 'table1', 'dag_group_number': 1, 'job_schedule_cron': '0 5 * * *'},
    {'source_system': 'src1', 'table_name': 'table1', 'dag_group_number': 2, 'job_schedule_cron': '0 5 * * *'},
    {'source_system': 'src2', 'table_name': 'table2', 'dag_group_number': 1, 'job_schedule_cron': '0 6 * * *'},
    {'source_system': 'src2', 'table_name': 'table2', 'dag_group_number': 2, 'job_schedule_cron': '0 6 * * *'},
    {'source_system': 'src3', 'table_name': 'table3', 'dag_group_number': 2, 'job_schedule_cron': '0 10 * * *'}
]

Afer generating the dictionary, I am iterating over it and generating DAGs in UI using the globals() concept.

here is sample code from Astronomer. Link

from pendulum import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator


def create_dag(dag_id, schedule, dag_number, default_args):
    def hello_world_py():
        print("Hello World")
        print("This is DAG: {}".format(str(dag_number)))

    generated_dag = DAG(dag_id, schedule=schedule, default_args=default_args)

    with generated_dag:
        PythonOperator(task_id="hello_world", python_callable=hello_world_py)

    return generated_dag


# build a dag for each number in range(1, 4)
for n in range(1, 4):
    dag_id = "loop_hello_world_{}".format(str(n))

    default_args = {"owner": "airflow", "start_date": datetime(2023, 7, 1)}

    schedule = "@daily"
    dag_number = n

    globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)

My question is: How do I set the schedule dynamically? I have the schedule defined in the metadata table for each source and the same I want to use in DAG as well.

if I use

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval=metadata[source_system][job_schedule_cron],
    dagrun_timeout=timedelta(minutes=1))

I am getting errors: AirflowTimetableInvalid: Exactly 5,6 or 7 columns has to be specified for iterator expression

and

Airflow Timetable Invalid: ['0 7 * * *'] is not acceptable.

Any suggestions or guidance on how to fix this?

I am creating the dag dynamically using the metadata that is stored in table.

I am reading the table and creating a dictionary that looks like below.

This is sample data and The table has other columns as well.

metadata = [
    {'source_system': 'src1', 'table_name': 'table1', 'dag_group_number': 1, 'job_schedule_cron': '0 5 * * *'},
    {'source_system': 'src1', 'table_name': 'table1', 'dag_group_number': 1, 'job_schedule_cron': '0 5 * * *'},
    {'source_system': 'src1', 'table_name': 'table1', 'dag_group_number': 2, 'job_schedule_cron': '0 5 * * *'},
    {'source_system': 'src2', 'table_name': 'table2', 'dag_group_number': 1, 'job_schedule_cron': '0 6 * * *'},
    {'source_system': 'src2', 'table_name': 'table2', 'dag_group_number': 2, 'job_schedule_cron': '0 6 * * *'},
    {'source_system': 'src3', 'table_name': 'table3', 'dag_group_number': 2, 'job_schedule_cron': '0 10 * * *'}
]

Afer generating the dictionary, I am iterating over it and generating DAGs in UI using the globals() concept.

here is sample code from Astronomer. Link

from pendulum import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator


def create_dag(dag_id, schedule, dag_number, default_args):
    def hello_world_py():
        print("Hello World")
        print("This is DAG: {}".format(str(dag_number)))

    generated_dag = DAG(dag_id, schedule=schedule, default_args=default_args)

    with generated_dag:
        PythonOperator(task_id="hello_world", python_callable=hello_world_py)

    return generated_dag


# build a dag for each number in range(1, 4)
for n in range(1, 4):
    dag_id = "loop_hello_world_{}".format(str(n))

    default_args = {"owner": "airflow", "start_date": datetime(2023, 7, 1)}

    schedule = "@daily"
    dag_number = n

    globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)

My question is: How do I set the schedule dynamically? I have the schedule defined in the metadata table for each source and the same I want to use in DAG as well.

if I use

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval=metadata[source_system][job_schedule_cron],
    dagrun_timeout=timedelta(minutes=1))

I am getting errors: AirflowTimetableInvalid: Exactly 5,6 or 7 columns has to be specified for iterator expression

and

Airflow Timetable Invalid: ['0 7 * * *'] is not acceptable.

Any suggestions or guidance on how to fix this?

Share Improve this question edited Feb 8 at 4:54 Veeresh asked Feb 7 at 21:59 VeereshVeeresh 11 bronze badge
Add a comment  | 

1 Answer 1

Reset to default 0

There is another way to solve this issue

Step 1:

Create the Airflow Variables for the each DAG; the key should be DAG name and value should be cron schedule for example, key = loop_hello_world_1 , value = 0 5 * * *

The airflow variables should be created while airflow services are coming up during the deployment time.

Step 2:

In side your DAG creation code, get the Airflow Variable using the dag name and use it for schedule.

Similary, you have to create multiple variables in the Airflow Variables section for each DAG ids.

sample code to retrieve the Variable value using Variables.get function. you can also pass the default value to the get function; the default value will be a fallback incase the key is not present in the env.

# build a dag for each number in range(1, 4)
for n in range(1, 4):
    dag_id = "loop_hello_world_{}".format(str(n))
    
    # adding logic to fetch the schedule value from Variables
    # add default 
    # values incase the key is not present in the Airflow variables section
    schedule_cron = Variable.get('loop_hello_world_{}'.format(str(n), None) 
    

    default_args = {"owner": "airflow", "start_date": datetime(2023, 7, 1)}

    schedule = "@daily"
    dag_number = n

    globals()[dag_id] = create_dag(dag_id, schedule_cron, dag_number, default_args)
发布评论

评论列表(0)

  1. 暂无评论