最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Airflow DAG gets stuck when filtering a Polars DataFrame - Stack Overflow

programmeradmin5浏览0评论

I am dynamically generating Airflow DAGs based on data from a Polars DataFrame. The DAG definition includes filtering this DataFrame at DAG creation time and again inside a task when the DAG runs.

However, when I run the dag and I attempt to filter the polars dataframe inside the dynamically generated DAG, the task gets stuck indefinitely after printing before filter, without raising an error. Just gets stuck and runs forever until an airflow exception is thrown on memory usage.

I am with airflow 2.7.3 version and polars 0.20.31 for what it is worth mentioning it.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import polars as pl

def dag_constructor(name):
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2023, 1, 1),
        'retries': 1,
    }

    # Define the DAG
    dag = DAG(
        dag_id=f'{name}',
        default_args=default_args,
        description='A simple DAG to print Hello World',
        schedule_interval='@daily',
        catchup=False,
    )


    def print_hello():
        print("starting")

        df = pl.DataFrame({
            "key": ["A", "B", "A"],
            "branch": ["br1", "ooo", "br2"],
            "chain": ["ch1", "Y", "ch2"]
        })

        print(df)
        print("before filter") 
        chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
        print("after filter")
        print(chains)
        print("finish dag")


    hello_task = PythonOperator(
        task_id='print_hello',
        python_callable=print_hello,
        dag=dag,
    )

    hello_task

    return dag

df = pl.DataFrame({
        "key": ["A", "B", "A"],
        "branch": ["br1", "ooo", "br2"],
        "chain": ["ch1", "Y", "ch2"]
    })
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
## chains = ["ch1", "ch2"]  THIS WOULD WORK, AND WONT GET STUCK, if uncommenting and commenting previous line
for ch in chains:
    dag_my_id = f"aa__{str(ch)}"
    globals()[dag_my_id] = dag_constructor("aa__"+ch)

发布评论

评论列表(0)

  1. 暂无评论