I am dynamically generating Airflow DAGs based on data from a Polars DataFrame. The DAG definition includes filtering this DataFrame at DAG creation time and again inside a task when the DAG runs.
However, when I run the dag and I attempt to filter the polars dataframe inside the dynamically generated DAG, the task gets stuck indefinitely after printing before filter
, without raising an error. Just gets stuck and runs forever until an airflow exception is thrown on memory usage.
I am with airflow 2.7.3 version and polars 0.20.31 for what it is worth mentioning it.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import polars as pl
def dag_constructor(name):
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
# Define the DAG
dag = DAG(
dag_id=f'{name}',
default_args=default_args,
description='A simple DAG to print Hello World',
schedule_interval='@daily',
catchup=False,
)
def print_hello():
print("starting")
df = pl.DataFrame({
"key": ["A", "B", "A"],
"branch": ["br1", "ooo", "br2"],
"chain": ["ch1", "Y", "ch2"]
})
print(df)
print("before filter")
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
print("after filter")
print(chains)
print("finish dag")
hello_task = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
dag=dag,
)
hello_task
return dag
df = pl.DataFrame({
"key": ["A", "B", "A"],
"branch": ["br1", "ooo", "br2"],
"chain": ["ch1", "Y", "ch2"]
})
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
## chains = ["ch1", "ch2"] THIS WOULD WORK, AND WONT GET STUCK, if uncommenting and commenting previous line
for ch in chains:
dag_my_id = f"aa__{str(ch)}"
globals()[dag_my_id] = dag_constructor("aa__"+ch)