I have topic that i have to read from kafka server so for that i just need to create consumer that can read data from kafka topic, I always get error topic does not exist.
1- How can i make sure kafka connection is established ?
2- How to get the data from specific topic in kafka ?
main.js
var kafka = require('kafka-node');
var config = require('./config.js');
var kafkaConn = config.kafkaCon.dit;
var HighLevelConsumer = kafka.HighLevelConsumer;
//var HighLevelProducer = kafka.HighLevelProducer;
var Client = kafka.Client;
var Offset = kafka.Offset;
var topics = [{topic: 'UEQ'}];
var client = new Client(kafkaConn);
var payloads = [ { topic: topics, partition : 0}];
var options = {
groupId: 'kafka-node-group',
// Auto mit config
autoCommit: true,
autoCommitMsgCount: 100,
autoCommitIntervalMs: 5000,
// Fetch message config
fetchMaxWaitMs: 100,
fetchMinBytes: 1,
fetchMaxBytes: 1024 * 10,
};
var consumer = new HighLevelConsumer(client, payloads, options);
consumer.on('message', function (message) {
console.log('TEST',this.id, message);
});
error
events.js:141
throw er; // Unhandled 'error' event
^
TopicsNotExistError: The topic(s) [object Object] do not exist
at new TopicsNotExistError (C:\uilogging\node_modules\kafka-node\lib\errors\
TopicsNotExistError.js:11:11)
I have topic that i have to read from kafka server so for that i just need to create consumer that can read data from kafka topic, I always get error topic does not exist.
1- How can i make sure kafka connection is established ?
2- How to get the data from specific topic in kafka ?
main.js
var kafka = require('kafka-node');
var config = require('./config.js');
var kafkaConn = config.kafkaCon.dit;
var HighLevelConsumer = kafka.HighLevelConsumer;
//var HighLevelProducer = kafka.HighLevelProducer;
var Client = kafka.Client;
var Offset = kafka.Offset;
var topics = [{topic: 'UEQ'}];
var client = new Client(kafkaConn);
var payloads = [ { topic: topics, partition : 0}];
var options = {
groupId: 'kafka-node-group',
// Auto mit config
autoCommit: true,
autoCommitMsgCount: 100,
autoCommitIntervalMs: 5000,
// Fetch message config
fetchMaxWaitMs: 100,
fetchMinBytes: 1,
fetchMaxBytes: 1024 * 10,
};
var consumer = new HighLevelConsumer(client, payloads, options);
consumer.on('message', function (message) {
console.log('TEST',this.id, message);
});
error
events.js:141
throw er; // Unhandled 'error' event
^
TopicsNotExistError: The topic(s) [object Object] do not exist
at new TopicsNotExistError (C:\uilogging\node_modules\kafka-node\lib\errors\
TopicsNotExistError.js:11:11)
Share
Improve this question
edited Jun 21, 2016 at 20:52
hussain
asked Jun 20, 2016 at 20:37
hussainhussain
7,14121 gold badges87 silver badges165 bronze badges
2 Answers
Reset to default 4I am doing a similar project where i have a Kafka producer on its own server and am using Kafka-Node as a consumer for my application. I am fairly new to Kafka-Node, and don't have much experience with it, but i can try to share some of the insights i have found.
I believe your problem is literally that your topic doesn't exist.
1. How can i make sure Kafka connection is established?
If your connection wasn't established, i don't think it would move on to say the topic doesn't exist. When i type in a topic that doesn't exist and i type a random ip for my Kafka producer, nothing errors out. But when i point to the correct ip, and an still have incorrect topic, i get the same error you see.
2. This code is working for my application
var kafka = require('kafka-node');
var Consumer = kafka.Consumer,
// The client specifies the ip of the Kafka producer and uses
// the zookeeper port 2181
client = new kafka.Client("<ip to producer>:2181"),
// The consumer object specifies the client and topic(s) it subscribes to
consumer = new Consumer(
client, [ { topic: 'myTopic', partition: 0 } ], { autoCommit: false });
consumer.on('message', function (message) {
// grab the main content from the Kafka message
var data = JSON.parse(message.value);
console.log(data);
});
Hopefully this doesn't find you too late.
If you need this for debugging/development purposes then just add the following imports (the following code is in ES6 format) and it should console.log out a message when the connection is established or if there were any failure messages:
this.kafkaLogging = require('kafka-node/logging');
this.kafkaLogging.setLoggerProvider(this.getLoggerProvider);
...
getLoggerProvider() {
return {
debug: console.log.bind(console),
info : console.log.bind(console),
warn : console.log.bind(console),
error: console.log.bind(console)
};
}