I am trying to run the following code:
CREATE TABLE IF NOT EXISTS some_source_table
(
myField1 VARCHAR,
myField2 VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'demo',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'some-id-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
CREATE TABLE IF NOT EXISTS some_sink_table
(
myField1 VARCHAR,
myField2 VARCHAR,
PRIMARY KEY (`myField1`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'demosink',
'properties.bootstrap.servers' = '***',
'key.format' = 'json',
'value.format' = 'json',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
INSERT INTO some_sink_table SELECT * FROM some_source_table;
I can see that I am consuming data in the Confluent platform, but no data is produced to the demosink topic.
I have tried to add dependencies to the example given here.
I have tried to add the following dependencies:
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
I use Flink version 1.19.1. Am I missing some dependency or are some of my dependencies incompatible? I have also tried to copy the flink-sql-connector-kafka and flink-clients .jar file into the lib folder inside the dockerfile.
I am trying to run the following code:
CREATE TABLE IF NOT EXISTS some_source_table
(
myField1 VARCHAR,
myField2 VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'demo',
'properties.bootstrap.servers' = '***',
'properties.group.id' = 'some-id-1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601',
'scan.topic-partition-discovery.interval'= '60000',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
CREATE TABLE IF NOT EXISTS some_sink_table
(
myField1 VARCHAR,
myField2 VARCHAR,
PRIMARY KEY (`myField1`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'demosink',
'properties.bootstrap.servers' = '***',
'key.format' = 'json',
'value.format' = 'json',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
'properties.ssl.endpoint.identification.algorithm' = 'https'
);
INSERT INTO some_sink_table SELECT * FROM some_source_table;
I can see that I am consuming data in the Confluent platform, but no data is produced to the demosink topic.
I have tried to add dependencies to the example given here.
I have tried to add the following dependencies:
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka</artifactId>
<version>3.1.0-1.18</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
I use Flink version 1.19.1. Am I missing some dependency or are some of my dependencies incompatible? I have also tried to copy the flink-sql-connector-kafka and flink-clients .jar file into the lib folder inside the dockerfile.
Share Improve this question asked Nov 28, 2024 at 9:56 user28527275user28527275 11 bronze badge 3 |1 Answer
Reset to default 0The problem was with properties.sasl.jaas.config
. Duplicate of this
'connector' = 'kafka'
) and insert directly to'upsert-kafka'
. 'upsert-kafka' tables require to define a PRIMARY KEY constraint. – Niko Commented Nov 28, 2024 at 10:13Error connecting to node b5-{BOOTSTRAP_SERVER} (id: 5 rack: 2) java.UnknownHostException: b5-{BOOTSTRAP_SERVER}: Name or service not known.
I get this error message with b0-, b1-, b2- and b5-. I have thePRIMARY KEY ('myField1') NOT ENFORCED
inupsert-kafka
is this not how you add a PRIMARY KEY? @Niko – user28527275 Commented Nov 28, 2024 at 11:21