Followed some solutions like this And
official doc
But it seems above work for java setup but not other like Nodejs/NestJs.
I tried with following code but fails connecting to broker/topic:
private kafka: Kafka;
private consumers: Consumer[] = [];
async onApplicationShutdown() {
await this.disconnectConsumers();
}
private async disconnectConsumers() {
try {
for (const consumer of this.consumers) {
await consumer.disconnect();
}
} catch (error) {
console.error('consumer failed to disconnect properly', error);
}
}
async consume(
groupId: string,
topic: ConsumerSubscribeTopic,
brokers: any,
config: ConsumerRunConfig,
) {
try {
this.registerLZ4Codec();
this.kafka = new Kafka({
clientId: groupId,
brokers: brokers,
connectionTimeout: kafkaConfig.connectionTimeout,
authenticationTimeout: kafkaConfig.authenticationTimeout,
reauthenticationThreshold: kafkaConfig.reauthenticationThreshold,
requestTimeout: kafkaConfig.requestTimeout,
retry: { retries: kafkaConfig.retry.retries },
ssl: {
rejectUnauthorized: false,
ca: [fs.readFileSync('src/config/certs/truststore.pem', 'utf-8')],
key: fs.readFileSync('src/config/certs/keystore.pem', 'utf-8'),
cert: fs.readFileSync('src/config/certs/keystore.pem', 'utf-8'),
passphrase: 'my-password',
},
logLevel: logLevel.ERROR
});
const consumer = await this.createConsumer(groupId);
await this.subscribeConsumer(consumer, topic);
await this.runConsumer(consumer, config);
this.consumers.push(consumer);
} catch (error) {
console.error(
'There is an error in topic consume:',
groupId,
topic,
config,
error,
);
}
}
private async createConsumer(groupId: string): Promise<Consumer> {
const consumer = this.kafka.consumer({ groupId });
await consumer.connect().catch((e) =>
console.error(`groupId: ${groupId}. consumer connection failed:`, e)
);
return consumer;
}
private async subscribeConsumer(consumer: Consumer, topic: ConsumerSubscribeTopic) {
await consumer.subscribe(topic).catch((e) =>
console.error(`topic: ${topic}. consumer subscription failed:`, e)
);
}
private async runConsumer(consumer: Consumer, config: ConsumerRunConfig) {
await consumer.run(config).catch((e) =>
console.error(`consumer failed to run with config:`, config, e)
);
}
private registerLZ4Codec() {
const LZ4Codec = {
async compress(encoder) {
return encode(encoder);
},
async decompress(buffer) {
return decode(buffer);
},
};
CompressionCodecs[CompressionTypes.LZ4] = () => LZ4Codec;
}
Error:
{"level":"ERROR","timestamp":"2025-03-20T11:41:37.106Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Failed to connect: error:0308010C:digital envelope routines::unsupported","retryCount":2,"retryTime":758}