最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

node.js - How to connect with Kafka Topic with SSL certificates - Stack Overflow

programmeradmin3浏览0评论

Followed some solutions like this And

official doc

But it seems above work for java setup but not other like Nodejs/NestJs.

I tried with following code but fails connecting to broker/topic:

  private kafka: Kafka;

  private consumers: Consumer[] = [];

  async onApplicationShutdown() {
    await this.disconnectConsumers();
  }

  private async disconnectConsumers() {
    try {
      for (const consumer of this.consumers) {
        await consumer.disconnect();
      }
    } catch (error) {
      console.error('consumer failed to disconnect properly', error);
    }
  }

  async consume(
    groupId: string,
    topic: ConsumerSubscribeTopic,
    brokers: any,
    config: ConsumerRunConfig,
  ) {
    try {
      this.registerLZ4Codec();
      this.kafka = new Kafka({
        clientId: groupId,
        brokers: brokers,
        connectionTimeout: kafkaConfig.connectionTimeout,
        authenticationTimeout: kafkaConfig.authenticationTimeout,
        reauthenticationThreshold: kafkaConfig.reauthenticationThreshold,
        requestTimeout: kafkaConfig.requestTimeout,
        retry: { retries: kafkaConfig.retry.retries },
        ssl: {
          rejectUnauthorized: false,
          ca: [fs.readFileSync('src/config/certs/truststore.pem', 'utf-8')],
          key: fs.readFileSync('src/config/certs/keystore.pem', 'utf-8'),
          cert: fs.readFileSync('src/config/certs/keystore.pem', 'utf-8'),
          passphrase: 'my-password',
        },
        logLevel: logLevel.ERROR
      });
      const consumer = await this.createConsumer(groupId);
      await this.subscribeConsumer(consumer, topic);
      await this.runConsumer(consumer, config);
      this.consumers.push(consumer);
    } catch (error) {
      console.error(
        'There is an error in topic consume:',
        groupId,
        topic,
        config,
        error,
      );
    }
  }

  private async createConsumer(groupId: string): Promise<Consumer> {
    const consumer = this.kafka.consumer({ groupId });
    await consumer.connect().catch((e) =>
      console.error(`groupId: ${groupId}. consumer connection failed:`, e)
    );
    return consumer;
  }

  private async subscribeConsumer(consumer: Consumer, topic: ConsumerSubscribeTopic) {
    await consumer.subscribe(topic).catch((e) =>
      console.error(`topic: ${topic}. consumer subscription failed:`, e)
    );
  }

  private async runConsumer(consumer: Consumer, config: ConsumerRunConfig) {
    await consumer.run(config).catch((e) =>
      console.error(`consumer failed to run with config:`, config, e)
    );
  }


  private registerLZ4Codec() {
    const LZ4Codec = {
      async compress(encoder) {
        return encode(encoder);
      },
      async decompress(buffer) {
        return decode(buffer);
      },
    };
    CompressionCodecs[CompressionTypes.LZ4] = () => LZ4Codec;
  }

Error:

{"level":"ERROR","timestamp":"2025-03-20T11:41:37.106Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: error:0308010C:digital envelope routines::unsupported","retryCount":2,"retryTime":758}

发布评论

评论列表(0)

  1. 暂无评论