最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Inserting data to kafka topic with Flink SQL using the kubernetes operator - Stack Overflow

programmeradmin0浏览0评论

I am trying to run the following code:

CREATE TABLE IF NOT EXISTS some_source_table
(
    myField1     VARCHAR,
    myField2      VARCHAR
) WITH (
    'connector' = 'kafka',
    'topic' = 'demo',
    'properties.bootstrap.servers' = '***',
    'properties.group.id' = 'some-id-1',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601',
    'scan.topic-partition-discovery.interval'= '60000',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
    'properties.ssl.endpoint.identification.algorithm' = 'https'
);

CREATE TABLE IF NOT EXISTS some_sink_table
(
    myField1     VARCHAR,
    myField2      VARCHAR,
    PRIMARY KEY (`myField1`) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'demosink',
    'properties.bootstrap.servers' = '***',
    'key.format' = 'json',
    'value.format' = 'json',
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
    'properties.ssl.endpoint.identification.algorithm' = 'https'
);

INSERT INTO some_sink_table SELECT * FROM some_source_table;

I can see that I am consuming data in the Confluent platform, but no data is produced to the demosink topic.

I have tried to add dependencies to the example given here.

I have tried to add the following dependencies:

        <dependency>
            <groupId>.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

I use Flink version 1.19.1. Am I missing some dependency or are some of my dependencies incompatible? I have also tried to copy the flink-sql-connector-kafka and flink-clients .jar file into the lib folder inside the dockerfile.

I am trying to run the following code:

CREATE TABLE IF NOT EXISTS some_source_table
(
    myField1     VARCHAR,
    myField2      VARCHAR
) WITH (
    'connector' = 'kafka',
    'topic' = 'demo',
    'properties.bootstrap.servers' = '***',
    'properties.group.id' = 'some-id-1',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json',
    'json.timestamp-format.standard' = 'ISO-8601',
    'scan.topic-partition-discovery.interval'= '60000',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
    'properties.ssl.endpoint.identification.algorithm' = 'https'
);

CREATE TABLE IF NOT EXISTS some_sink_table
(
    myField1     VARCHAR,
    myField2      VARCHAR,
    PRIMARY KEY (`myField1`) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'demosink',
    'properties.bootstrap.servers' = '***',
    'key.format' = 'json',
    'value.format' = 'json',
    'properties.security.protocol' = 'SASL_SSL',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = '.apache.kafkamon.security.plain.PlainLoginModule required username=*** password=***;',
    'properties.ssl.endpoint.identification.algorithm' = 'https'
);

INSERT INTO some_sink_table SELECT * FROM some_source_table;

I can see that I am consuming data in the Confluent platform, but no data is produced to the demosink topic.

I have tried to add dependencies to the example given here.

I have tried to add the following dependencies:

        <dependency>
            <groupId>.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka</artifactId>
            <version>3.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

I use Flink version 1.19.1. Am I missing some dependency or are some of my dependencies incompatible? I have also tried to copy the flink-sql-connector-kafka and flink-clients .jar file into the lib folder inside the dockerfile.

Share Improve this question asked Nov 28, 2024 at 9:56 user28527275user28527275 11 bronze badge 3
  • Do you have any error messages ? Because normally you can't read standard kafka topic ('connector' = 'kafka') and insert directly to 'upsert-kafka'. 'upsert-kafka' tables require to define a PRIMARY KEY constraint. – Niko Commented Nov 28, 2024 at 10:13
  • I get Error connecting to node b5-{BOOTSTRAP_SERVER} (id: 5 rack: 2) java.UnknownHostException: b5-{BOOTSTRAP_SERVER}: Name or service not known. I get this error message with b0-, b1-, b2- and b5-. I have the PRIMARY KEY ('myField1') NOT ENFORCED in upsert-kafka is this not how you add a PRIMARY KEY? @Niko – user28527275 Commented Nov 28, 2024 at 11:21
  • I dont know the real problem with your bootstrap server. Try to add error log to your post. Flink probably can't access to servers because your kafka container not sharing the same network with Flink. You have PRIMARY KEY in your sink table but your source table doesn't have it. Read this doc for better understanding – Niko Commented Nov 28, 2024 at 13:09
Add a comment  | 

1 Answer 1

Reset to default 0

The problem was with properties.sasl.jaas.config. Duplicate of this

发布评论

评论列表(0)

  1. 暂无评论