最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - KafkaProducer: How to raise and catch UNKNOWN_TOPIC_OR_PARTITION error? - Stack Overflow

programmeradmin4浏览0评论

I have Kafka with disabled topic auto creation. All topics must be created with external API. And I want to implement topic auto creation =)

try { producer.send(...) } catch (UnknownTopicException e) { ... external API call ... }

By default, KafkaProducer do not throw exception on send to unknown topic. Only logs endlessly UNKNOWN_TOPIC_OR_PARTITION errors:

[kafka-producer-network-thread | producer-1] WARN .apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {unknown_topic=UNKNOWN_TOPIC_OR_PARTITION}

I fix this with decreasing max.block.ms configuration. Now KafkaProducer don't retry before thrown the exception:

import .apache.kafka.clients.producer.*;
import .apache.kafkamon.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) {
        var properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty("max.block.ms", "0");

        try (var producer = new KafkaProducer<String, String>(properties)) {
            System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("unknown_topic", "hello world")).get());
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

This doesn't seem to be the best solution. What other options are there?

I have Kafka with disabled topic auto creation. All topics must be created with external API. And I want to implement topic auto creation =)

try { producer.send(...) } catch (UnknownTopicException e) { ... external API call ... }

By default, KafkaProducer do not throw exception on send to unknown topic. Only logs endlessly UNKNOWN_TOPIC_OR_PARTITION errors:

[kafka-producer-network-thread | producer-1] WARN .apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {unknown_topic=UNKNOWN_TOPIC_OR_PARTITION}

I fix this with decreasing max.block.ms configuration. Now KafkaProducer don't retry before thrown the exception:

import .apache.kafka.clients.producer.*;
import .apache.kafkamon.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) {
        var properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty("max.block.ms", "0");

        try (var producer = new KafkaProducer<String, String>(properties)) {
            System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("unknown_topic", "hello world")).get());
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

This doesn't seem to be the best solution. What other options are there?

Share Improve this question asked Mar 31 at 13:03 Makrushin EvgeniiMakrushin Evgenii 1,2212 gold badges11 silver badges23 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 1

The best option to check if the topic exists is to call AdminClient before any call.

It has this method: describeTopics. That method throws the UNKNOWN_TOPIC_OR_PARTITION exception, so the way to go is to catch it.

https://kafka.apache./23/javadoc//apache/kafka/clients/admin/AdminClient.html#describeTopics-java.util.Collection-.apache.kafka.clients.admin.DescribeTopicsOptions-

You could just modify your try block:

//...

    try (var adminClient = AdminClient.create(properties);
         var producer = new KafkaProducer<String, String>(properties)) 
    {
        
        if (topicExists(adminClient, "yourTopic")) 
        {
          System.out.println("recordMetadata: " + producer.send(new ProducerRecord<>("yourTopic", "hi!")).get());
        //...
        } 
         else //well, it doesn't exist as we catched the exception and returned false
        {
           //your non-topic logic here.
        }
    
      //...  
    } catch (ExecutionException | InterruptedException e) {
             throw new RuntimeException(e);
    }
//...

private boolean topicExists(AdminClient adminClient, String topic) 
{
   try 
   {
      DescribeTopicsResult result = adminClient.describeTopics(java.util.List.of(topic));
      result.all().get();
      return true; //OK, it exists
    }
    catch (ExecutionException e) 
    {
      if (e.getCause() instanceof UnknownTopicOrPartitionException) 
         return false; //catch the false -- UnknownTopicOrPartition means it doesnt exist
      else 
         throw new RuntimeException(e);
    } 
 }

Thank @aran for this hint with describeTopics(). Write this decorator for KafkaProducer. Redirect records to backoff-topic, If destination topic is unknown or creation is requested a short time ago

import .apache.kafka.clients.admin.AdminClient;
import .apache.kafka.clients.producer.Callback;
import .apache.kafka.clients.producer.KafkaProducer;
import .apache.kafka.clients.producer.ProducerRecord;
import .apache.kafka.clients.producer.RecordMetadata;
import .apache.kafkamon.errors.UnknownTopicOrPartitionException;

import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaProducerAutoCreateDecorator<K, V> extends KafkaProducer<K, V> {
    private final Map<String, Long> describedTopics = new HashMap<>();
    private final Map<String, Long> requestedTopics = new HashMap<>();
    private final AdminClient adminClient;

    private static final long describedTopicsTtlMs = 1000;
    private static final long requestedTopicsTtlMs = 1000;
    private static final String defaultTopic = "test";

    public KafkaProducerAutoCreateDecorator(Properties properties) {
        super(properties);
        this.adminClient = AdminClient.create(properties);
    }

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        if (!isTopicExist(record.topic())) {
            record = new ProducerRecord<>(defaultTopic, record.value());
        }

        return super.send(record, callback);
    }

    private boolean isTopicExist(String topic) {
        if (describedTopics.containsKey(topic) && Instant.now().toEpochMilli() - describedTopics.get(topic) < describedTopicsTtlMs) {
            return true;
        }

        if (requestedTopics.containsKey(topic) && Instant.now().toEpochMilli() - requestedTopics.get(topic) < requestedTopicsTtlMs) {
            return false;
        }

        try {
            adminClient.describeTopics(List.of(topic)).all().get();
            describedTopics.put(topic, Instant.now().toEpochMilli());
            requestedTopics.remove(topic);
            return true;
        } catch (ExecutionException | InterruptedException e) {
            if (e.getCause() instanceof UnknownTopicOrPartitionException) {
                // TODO: request topic creation
                requestedTopics.put(topic, Instant.now().toEpochMilli());
                return false;
            }
            throw new RuntimeException(e);
        }
    }

    public void close() {
        adminClient.close();
        super.close();
    }
}
import .apache.kafka.clients.producer.*;
import .apache.kafkamon.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Main {
    public static void main(String[] args) {
        var properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "***:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        var records = new ProducerRecord[]{
                new ProducerRecord<>("test", "..."),
                new ProducerRecord<>("unknown_topic", "..."),
                new ProducerRecord<>("test", "..."),
                new ProducerRecord<>("unknown_topic", "...")
        };

        try (var producer = new KafkaProducerAutoCreateDecorator<String, String>(properties)) {
            for (var record : records) {
                System.out.println(producer.send(record).get());
            }
        } catch (ExecutionException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

Not sure what will happen if the described topic is deleted

发布评论

评论列表(0)

  1. 暂无评论