I am trying to implement a Dynamic DAG logic in my Airflow instance. My tasks are generally SQL or Python tasks, and I have a JSON configuration of the below structure to parse the tasks
[{ "name": <task_id>, "type": "python|sql","sql_path": "<path_to_sql.sql>", "python_callable": "module.submodule.subsubmodule.function_name"}]
Of course, the sql_path
and python_callable
are interchangeable depending on the value of the type
attribute. The python_callable string is also a string representation of a folder structure from the root of the dags
folder as below:
dags
|__ module
|___ submodule
|__ subsubmodule.py (contains def function_name())
This JSON configuration is parsed to DAG tasks using the below logic.
if step.get("type", "sql") == "sql":
script_path = step.get("script_path")
with open(script_path) as sql_file:
sql_query = sql_file.read()
push_xcom = False
task_ = get_clickhouse_query_operator(name, sql_query, push_xcom=push_xcom)
else:
callable_func_str = step.get("python_callable")
callable_func_parts = callable_func_str.split(".")
callable_module = importlib.import_module(f'{".".join(callable_func_parts[:-1])}')
callable = getattr(callable_module, callable_func_parts[-1])
print(callable)
task_ = PythonOperator(
task_id= name,
python_callable= callable,
provide_context = True,
trigger_rule = "all_success"
)
Each DAG also has a dummy task operator as the first task, defined as below:
@task(do_xcom_push=False)
def start():
print("Starting DAG Run")
return
In the python
block, I try to extract the actual module path containing the function and then get the function object using getattr()
. This all seems to work, and the DAGs gets rendered fine in the UI. The print(callable)
statement also prints the expected function object in the scheduler logs as below.
<function function_name at 0xffff7a5c1cb0>
However, when I start any of the DAGs, it fails right from the start
task, and does not even generate any logs for the task instance. If I remove all python
type task definitions from the config JSON, the dags would run successfully. Also if I replace the importlib
portion of the code, and replace callable with a simple do_nothing
function as below, the dags run successfully, so my suspicion still rests on the use of importlib.
if step.get("type", "sql") == "sql":
script_path = step.get("script_path")
with open(script_path) as sql_file:
sql_query = sql_file.read()
push_xcom = False
task_ = get_clickhouse_query_operator(name, sql_query, push_xcom=push_xcom)
else:
def do_nothing():
print("Doing Nothing")
pass
task_ = PythonOperator(
task_id= name,
python_callable= do_nothing,
provide_context = True,
trigger_rule = "all_success"
)
How can I go about this problem? I just want to be able to parse a string of a function's module path specified in the config json, to then use the function as the python_callable for the PythonOperator.
Airflow Version: 2.6.3 Python Version: 3.7.17