As I am new in Flink and I would like to make connectivity between AWS Flink application and AWS Glue catalog...and I have created a Flink application through AWS CLI. when I run the application then I got the error like
Internal server error.<Exception on server side: java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file. at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.handleRequest(JarRunOverrideHandler.java:262) at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandl
My project directory is like below:
my_flink_app.zip
└── main.py
└──jar files (3 jar files)
(Here my main.py file which made connectivity between Flink to AWS Glue catalog)
#Pre-requisites:
#HADOOP_VERSION=2.8.5
#HADOOP_HOME=`pwd`/hadoop-${HADOOP_VERSION}
#export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
# Set up Flink environment
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)
s3_jar_paths = [
"s3://xxxx.dev.data-platform.temporary/warehouse/iceberg-flink-runtime-1.19-1.7.1.jar",
"s3://xxxx.dev.data-platform.temporary/warehouse/iceberg-aws-bundle-1.7.1.jar",
"s3://xxxx.dev.data-platform.temporary/warehouse/flink-s3-fs-hadoop-1.19.1.jar"
]
table_env.get_config().set("pipeline.jars", ",".join(s3_jar_paths))
# Use the Glue catalog
table_env.execute_sql(f"""
CREATE CATALOG glue WITH (
'type'='iceberg',
'warehouse'='s3://xxxx.dev.data-platform.temporary/hive_to_glue/',
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'
)
""")
table_env.execute_sql("show catalogs").print() # Show the available catalogs
table_env.execute_sql("use catalog glue") # Use the Glue catalog
table_env.execute_sql("show databases").print()
table_env.execute_sql("use events_dev") # Use the Glue database
table_env.execute_sql("show tables").print()
table_env.sql_query("SELECT * from document_service_create_document_rejected limit 5").execute().print()
As above code working fine as expected on my local but When I configure it to my AWS flink application with uploading my zip file to AWS s3 then I am facing the issue as org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.
can someone please help me how can I upload a zip file in s3 like
- Which structure I need to follow here on zip file?
- As I have multiple jar files so how can I arrange it in my zip files?