In an Airflow pipeline I want to run a KubernetesPodOperator using the @task.kubernetes
operator calling a custom method.
@task.kubernetes(
task_id="parse-legislation-nl",
name="legislation-nl-parse-job",
**default_pod_args,
)
def parse_legislation_task(**kwargs):
from my_module import my_dask_method
my_dask_method.start(**kwargs)
The method starts a Dask client:
dask_client = Client(address=None)
But that returns the following log error:
2025-01-22 16:08:14,001 - ERROR - Failed to start process
Traceback (most recent call last):
File "/usr/local/lib/python3.12/site-packages/distributed/nanny.py", line 452, in instantiate
result = await self.process.start()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/distributed/nanny.py", line 750, in start
await self.process.start()
File "/usr/local/lib/python3.12/site-packages/distributed/process.py", line 55, in _call_and_set_future
res = func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/distributed/process.py", line 215, in _start
process.start()
File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/multiprocessing/context.py", line 289, in _Popen
return Popen(process_obj)
^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/usr/local/lib/python3.12/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/multiprocessing/spawn.py", line 164, in get_preparation_data
_check_not_importing_main()
File "/usr/local/lib/python3.12/multiprocessing/spawn.py", line 140, in _check_not_importing_main
raise RuntimeError('''
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have fotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
To fix this issue, refer to the "Safe importing of main module"
section in .html