I have Kafka with disabled topic auto creation. All topics must be created with external API. And I want to implement topic auto creation =)
try { producer.send(...) } catch (UnknownTopicException e) { ... external API call ... }
By default, KafkaProducer
do not throw exception on send to unknown topic. Only logs endlessly UNKNOWN_TOPIC_OR_PARTITION
errors:
[kafka-producer-network-thread | producer-1] WARN .apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {unknown_topic=UNKNOWN_TOPIC_OR_PARTITION}
I fix this with decreasing max.block.ms
configuration. Now KafkaProducer
don't retry before thrown the exception:
import .apache.kafka.clients.producer.*;
import .apache.kafkamon.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) {
var properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty("max.block.ms", "0");
try (var producer = new KafkaProducer<String, String>(properties)) {
System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("unknown_topic", "hello world")).get());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
This doesn't seem to be the best solution. What other options are there?
I have Kafka with disabled topic auto creation. All topics must be created with external API. And I want to implement topic auto creation =)
try { producer.send(...) } catch (UnknownTopicException e) { ... external API call ... }
By default, KafkaProducer
do not throw exception on send to unknown topic. Only logs endlessly UNKNOWN_TOPIC_OR_PARTITION
errors:
[kafka-producer-network-thread | producer-1] WARN .apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {unknown_topic=UNKNOWN_TOPIC_OR_PARTITION}
I fix this with decreasing max.block.ms
configuration. Now KafkaProducer
don't retry before thrown the exception:
import .apache.kafka.clients.producer.*;
import .apache.kafkamon.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) {
var properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty("max.block.ms", "0");
try (var producer = new KafkaProducer<String, String>(properties)) {
System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("unknown_topic", "hello world")).get());
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
This doesn't seem to be the best solution. What other options are there?
Share Improve this question asked Mar 31 at 13:03 Makrushin EvgeniiMakrushin Evgenii 1,2212 gold badges11 silver badges23 bronze badges2 Answers
Reset to default 1The best option to check if the topic exists is to call AdminClient
before any call.
It has this method: describeTopics
. That method throws the UNKNOWN_TOPIC_OR_PARTITION
exception, so the way to go is to catch it.
https://kafka.apache./23/javadoc//apache/kafka/clients/admin/AdminClient.html#describeTopics-java.util.Collection-.apache.kafka.clients.admin.DescribeTopicsOptions-
You could just modify your try block:
//...
try (var adminClient = AdminClient.create(properties);
var producer = new KafkaProducer<String, String>(properties))
{
if (topicExists(adminClient, "yourTopic"))
{
System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("yourTopic", "hi!")).get());
//...
}
else //well, it doesn't exist as we catched the exception and returned false
{
//your non-topic logic here.
}
//...
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
//...
private boolean topicExists(AdminClient adminClient, String topic)
{
try
{
DescribeTopicsResult result = adminClient.describeTopics(java.util.List.of(topic));
result.all().get();
return true; //OK, it exists
}
catch (ExecutionException e)
{
if (e.getCause() instanceof UnknownTopicOrPartitionException)
return false; //catch the false -- UnknownTopicOrPartition means it doesnt exist
else
throw new RuntimeException(e);
}
}
Thank @aran for this hint with describeTopics()
. Write this decorator for KafkaProducer
. Redirect records to backoff-topic, If destination topic is unknown or creation is requested a short time ago
import .apache.kafka.clients.admin.AdminClient;
import .apache.kafka.clients.producer.Callback;
import .apache.kafka.clients.producer.KafkaProducer;
import .apache.kafka.clients.producer.ProducerRecord;
import .apache.kafka.clients.producer.RecordMetadata;
import .apache.kafkamon.errors.UnknownTopicOrPartitionException;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class KafkaProducerAutoCreateDecorator<K, V> extends KafkaProducer<K, V> {
private final Map<String, Long> describedTopics = new HashMap<>();
private final Map<String, Long> requestedTopics = new HashMap<>();
private final AdminClient adminClient;
private static final long describedTopicsTtlMs = 1000;
private static final long requestedTopicsTtlMs = 1000;
private static final String defaultTopic = "test";
public KafkaProducerAutoCreateDecorator(Properties properties) {
super(properties);
this.adminClient = AdminClient.create(properties);
}
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
if (!isTopicExist(record.topic())) {
record = new ProducerRecord<>(defaultTopic, record.value());
}
return super.send(record, callback);
}
private boolean isTopicExist(String topic) {
if (describedTopics.containsKey(topic) && Instant.now().toEpochMilli() - describedTopics.get(topic) < describedTopicsTtlMs) {
return true;
}
if (requestedTopics.containsKey(topic) && Instant.now().toEpochMilli() - requestedTopics.get(topic) < requestedTopicsTtlMs) {
return false;
}
try {
adminClient.describeTopics(List.of(topic)).all().get();
describedTopics.put(topic, Instant.now().toEpochMilli());
requestedTopics.remove(topic);
return true;
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
// TODO: request topic creation
requestedTopics.put(topic, Instant.now().toEpochMilli());
return false;
}
throw new RuntimeException(e);
}
}
public void close() {
adminClient.close();
super.close();
}
}
import .apache.kafka.clients.producer.*;
import .apache.kafkamon.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Main {
public static void main(String[] args) {
var properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
var records = new ProducerRecord[]{
new ProducerRecord<>("test", "..."),
new ProducerRecord<>("unknown_topic", "..."),
new ProducerRecord<>("test", "..."),
new ProducerRecord<>("unknown_topic", "...")
};
try (var producer = new KafkaProducerAutoCreateDecorator<String, String>(properties)) {
for (var record : records) {
System.out.println(producer.send(record).get());
}
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Not sure what will happen if the described topic is deleted