最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Get Exception after submit the pyFlink Job - Stack Overflow

programmeradmin1浏览0评论

I am a new for pyflink. I try to run submit a simple python to YARN application mode. But I got the error said cannot find the python file word_count.py. Below is my environment and the exception log. Any one can know how to fix the error?

OS: Ubuntu 22.04.2

Hadoop: 
- node1: namenode, JobhistoryServer, ResourceManager
- node2: datanode, NodeManager
- node3: datanode, NodeManager

Python:
- version: 3.11.11
- location: /opt/anaconda3/envs/py_31111/bin/python

pyflink:
- 1.20.1
- /opt/flink-1.20.1

create the python script for job

cd /opt/flink-1.20.1
vi word_count2.py
import argparse
import logging
import sys

from pyflinkmon import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.file_system import FileSource, StreamFormat, FileSink, OutputFileConfig, RollingPolicy

word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)
    env.set_parallelism(1)
    ds = env.from_collection(word_count_data)
    
    word_ds = ds.flat_map(lambda line: line.split(), output_type=Types.STRING())
    tuple_word_ds = word_ds.map(lambda word: (word, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
    group_by_key_ds = tuple_word_ds.key_by(lambda tuple_word: tuple_word[0])
    
    reduce_ds = group_by_key_ds.reduce(lambda i, j: (i[0], i[1] + j[1]))
    reduce_ds.print()

    env.execute()

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    word_count()

submit the job:

conda activate py_31111
cd /opt/anaconda3/envs
zip -r venv.zip ./py_31111
mv venv.zip /opt/flinkk-1.20.1
cd /opt/flinkk-1.20.1
./bin/flink run -t yarn-application \
-Dyarn.ship-files=/opt/flink-1.20.1 \
-pyarch venv.zip \
-pyexec venv.zip/py_31111/bin/python3 \
-pyclientexec venv.zip/py_31111/bin/python3 \
-pyexec venv.zip/py_31111/bin/python3 \
-py flink-1.20.1/word_count2.py 

Get the Exception:

java.nio.file.NoSuchFileException: /tmp/pyflink/943c336a-9d42-4b99-befe-594ea803a863/fd17716e-50db-4077-9e29-37d4ff43a8fd/streaming_word_count.py
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) ~[?:1.8.0_432]
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) ~[?:1.8.0_432]
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) ~[?:1.8.0_432]
    at sun.nio.fs.UnixPath.toRealPath(UnixPath.java:837) ~[?:1.8.0_432]
    at .apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:309) ~[flink-python-1.20.1.jar:1.20.1]
    at .apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:226) ~[flink-python-1.20.1.jar:1.20.1]
    at .apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:487) ~[flink-python-1.20.1.jar:1.20.1]
    at .apache.flink.client.python.PythonDriver.main(PythonDriver.java:92) ~[flink-python-1.20.1.jar:1.20.1]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_432]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_432]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_432]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_432]
    at .apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.20.1.jar:1.20.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_432]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_432]
    at .apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172) ~[?:?]
    at .apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59) [flink-rpc-akka012286be-5b7e-4653-adc9-22e2fd665221.jar:1.20.1]
    at .apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:61) [flink-rpc-akka012286be-5b7e-4653-adc9-22e2fd665221.jar:1.20.1]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_432]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_432]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_432]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_432]
2025-04-01 23:36:28,353 WARN  .apache.flink.client.deployment.application.ApplicationDispatcherBootstrap [] - Application failed unexpectedly: 
java.util.concurrent.CompletionException: .apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_432]
    at java.util.concurrent.CompletableFuturepleteThrowable(CompletableFuture.java:308) ~[?:1.8.0_432]
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_432]
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_432]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_432]
    at java.util.concurrent.CompletableFuturepleteExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_432]
    at .apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.20.1.jar:1.20.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_432]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_432]
    at .apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172) ~[?:?]
    at .apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59) [flink-rpc-akka012286be-5b7e-4653-adc9-22e2fd665221.jar:1.20.1]
    at .apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:61) [flink-rpc-akka012286be-5b7e-4653-adc9-22e2fd665221.jar:1.20.1]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_432]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_432]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_432]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_432]
Caused by: .apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    ... 13 more
Caused by: .apache.flink.client.program.ProgramAbortException: java.nio.file.NoSuchFileException: /tmp/pyflink/943c336a-9d42-4b99-befe-594ea803a863/fd17716e-50db-4077-9e29-37d4ff43a8fd/streaming_word_count.py
    at .apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) ~[flink-python-1.20.1.jar:1.20.1]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_432]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_432]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_432]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_432]
    at .apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.20.1.jar:1.20.1]
    ... 12 more
Caused by: java.nio.file.NoSuchFileException: /tmp/pyflink/943c336a-9d42-4b99-befe-594ea803a863/fd17716e-50db-4077-9e29-37d4ff43a8fd/streaming_word_count.py
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) ~[?:1.8.0_432]
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) ~[?:1.8.0_432]
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) ~[?:1.8.0_432]
    at sun.nio.fs.UnixPath.toRealPath(UnixPath.java:837) ~[?:1.8.0_432]
    at .apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:309) ~[flink-python-1.20.1.jar:1.20.1]
    at .apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:226) ~[flink-python-1.20.1.jar:1.20.1]
    at .apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:487) ~[flink-python-1.20.1.jar:1.20.1]
    at .apache.flink.client.python.PythonDriver.main(PythonDriver.java:92) ~[flink-python-1.20.1.jar:1.20.1]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_432]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_432]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_432]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_432]
    at .apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.20.1.jar:1.20.1]
    ... 12 more
2025-04-01 23:36:28,356 ERROR .apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: .apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) ~[?:1.8.0_432]
    at java.util.concurrent.CompletableFuturepleteThrowable(CompletableFuture.java:308) ~[?:1.8.0_432]
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957) ~[?:1.8.0_432]
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ~[?:1.8.0_432]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_432]
    at java.util.concurrent.CompletableFuturepleteExceptionally(CompletableFuture.java:1990) ~[?:1.8.0_432]
    at .apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[flink-dist-1.20.1.jar:1.20.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_432]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_432]
    at .apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:172) ~[?:?]
    at .apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.runtime.concurrent.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.pekko.dispatch.TaskInvocation.run(AbstractDispatcher.scala:59) [flink-rpc-akka012286be-5b7e-4653-adc9-22e2fd665221.jar:1.20.1]
    at .apache.pekko.dispatch.ForkJoinExecutorConfigurator$PekkoForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:61) [flink-rpc-akka012286be-5b7e-4653-adc9-22e2fd665221.jar:1.20.1]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_432]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_432]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_432]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_432]
Caused by: .apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
    ... 13 more
Caused by: .apache.flink.client.program.ProgramAbortException: java.nio.file.NoSuchFileException: /tmp/pyflink/943c336a-9d42-4b99-befe-594ea803a863/fd17716e-50db-4077-9e29-37d4ff43a8fd/streaming_word_count.py
    at .apache.flink.client.python.PythonDriver.main(PythonDriver.java:134) ~[flink-python-1.20.1.jar:1.20.1]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_432]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_432]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_432]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_432]
    at .apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.20.1.jar:1.20.1]
    ... 12 more
Caused by: java.nio.file.NoSuchFileException: /tmp/pyflink/943c336a-9d42-4b99-befe-594ea803a863/fd17716e-50db-4077-9e29-37d4ff43a8fd/streaming_word_count.py
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) ~[?:1.8.0_432]
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) ~[?:1.8.0_432]
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) ~[?:1.8.0_432]
    at sun.nio.fs.UnixPath.toRealPath(UnixPath.java:837) ~[?:1.8.0_432]
    at .apache.flink.client.python.PythonEnvUtils.addToPythonPath(PythonEnvUtils.java:309) ~[flink-python-1.20.1.jar:1.20.1]
    at .apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment(PythonEnvUtils.java:226) ~[flink-python-1.20.1.jar:1.20.1]
    at .apache.flink.client.python.PythonEnvUtils.launchPy4jPythonClient(PythonEnvUtils.java:487) ~[flink-python-1.20.1.jar:1.20.1]
    at .apache.flink.client.python.PythonDriver.main(PythonDriver.java:92) ~[flink-python-1.20.1.jar:1.20.1]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_432]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_432]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_432]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_432]
    at .apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113) ~[flink-dist-1.20.1.jar:1.20.1]
    at .apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.20.1.jar:1.20.1]
    ... 12 more
发布评论

评论列表(0)

  1. 暂无评论