最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Importlib module problem with Airflow PythonOperator - Stack Overflow

programmeradmin2浏览0评论

I am trying to implement a Dynamic DAG logic in my Airflow instance. My tasks are generally SQL or Python tasks, and I have a JSON configuration of the below structure to parse the tasks

[{ "name": <task_id>, "type": "python|sql","sql_path": "<path_to_sql.sql>", "python_callable": "module.submodule.subsubmodule.function_name"}]

Of course, the sql_path and python_callable are interchangeable depending on the value of the type attribute. The python_callable string is also a string representation of a folder structure from the root of the dags folder as below:

dags
 |__ module
       |___ submodule
               |__ subsubmodule.py (contains def function_name())

This JSON configuration is parsed to DAG tasks using the below logic.

if step.get("type", "sql") == "sql":
   script_path = step.get("script_path")

   with open(script_path) as sql_file:
       sql_query = sql_file.read()

       push_xcom = False
       task_ = get_clickhouse_query_operator(name, sql_query, push_xcom=push_xcom)
else:
   callable_func_str = step.get("python_callable")
   callable_func_parts = callable_func_str.split(".")
   callable_module = importlib.import_module(f'{".".join(callable_func_parts[:-1])}')

   callable = getattr(callable_module, callable_func_parts[-1])
   print(callable)

   task_ = PythonOperator(
       task_id= name,
       python_callable= callable,
       provide_context = True,
       trigger_rule = "all_success"
   )

Each DAG also has a dummy task operator as the first task, defined as below:

@task(do_xcom_push=False)
def start():
   print("Starting DAG Run")
   return

In the python block, I try to extract the actual module path containing the function and then get the function object using getattr(). This all seems to work, and the DAGs gets rendered fine in the UI. The print(callable) statement also prints the expected function object in the scheduler logs as below. <function function_name at 0xffff7a5c1cb0>

However, when I start any of the DAGs, it fails right from the start task, and does not even generate any logs for the task instance. If I remove all python type task definitions from the config JSON, the dags would run successfully. Also if I replace the importlib portion of the code, and replace callable with a simple do_nothing function as below, the dags run successfully, so my suspicion still rests on the use of importlib.

if step.get("type", "sql") == "sql":
   script_path = step.get("script_path")

   with open(script_path) as sql_file:
       sql_query = sql_file.read()

       push_xcom = False
       task_ = get_clickhouse_query_operator(name, sql_query, push_xcom=push_xcom)
else:
   def do_nothing():
       print("Doing Nothing")
       pass

   task_ = PythonOperator(
       task_id= name,
       python_callable= do_nothing,
       provide_context = True,
       trigger_rule = "all_success"
   )

How can I go about this problem? I just want to be able to parse a string of a function's module path specified in the config json, to then use the function as the python_callable for the PythonOperator.

Airflow Version: 2.6.3 Python Version: 3.7.17

发布评论

评论列表(0)

  1. 暂无评论