最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

kubernetes - Airflow KubernetesPodOperator cannot start Dask client because multiprocess - Stack Overflow

programmeradmin1浏览0评论

In an Airflow pipeline I want to run a KubernetesPodOperator using the @task.kubernetes operator calling a custom method.

@task.kubernetes(
    task_id="parse-legislation-nl",
    name="legislation-nl-parse-job",
    **default_pod_args,
)
def parse_legislation_task(**kwargs):
    from my_module import my_dask_method

    my_dask_method.start(**kwargs)

The method starts a Dask client:

dask_client = Client(address=None)

But that returns the following log error:

2025-01-22 16:08:14,001 - ERROR - Failed to start process
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/distributed/nanny.py", line 452, in instantiate
    result = await self.process.start()
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/distributed/nanny.py", line 750, in start
    await self.process.start()
  File "/usr/local/lib/python3.12/site-packages/distributed/process.py", line 55, in _call_and_set_future
    res = func(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/distributed/process.py", line 215, in _start
    process.start()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/multiprocessing/context.py", line 289, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.12/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/multiprocessing/spawn.py", line 164, in get_preparation_data
    _check_not_importing_main()
  File "/usr/local/lib/python3.12/multiprocessing/spawn.py", line 140, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have fotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

        To fix this issue, refer to the "Safe importing of main module"
        section in .html
发布评论

评论列表(0)

  1. 暂无评论