最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - How to stop Kafka AdminClient reconnect retries? - Stack Overflow

programmeradmin0浏览0评论

I'm trying to write a health check for kafka, that would run periodically, check connectivity and report any problems through prometheus alerts. My idea on how to do it was to simply make a request to kafka in a try catch block, and treat any exception as lost connectivity, code below.

@Scheduled(fixedRate = 1000)
private void checkEventHubConnection() {
    try {
        System.out.println("Checking eventHub connection...");
        adminClient.listTopics().names().get();
        eventHubStatus = 1;
    } catch (ExecutionException ee) {
        eventHubStatusDown(ee);
    } catch (InterruptedException ie) {
        Thread.currentThread().interrupt();
        eventHubStatusDown(ie);
    }
}

My problem is with the unit tests. Checking the positive case is simple enough and works. However, when I want to disable the EmbeddedKafka, it gets stuck in a loop, trying to reconnect for 60 seconds. What I find even weirder, is that during these 60 seconds of reconnecting, my scheduled task checkEventHubConnection doesn't run at all. It only runs once at the beginning, and once at the end, after the 60 seconds. I'm suspecting it's because the retry operation locks the thread where the scheduled task runs?

Any opinions on if my general approach is correct and if so, how can I fix that? Below is my test setup and the test itself.

public class TestTest {

private EventHubHealthCheck eventHubHealthCheck;
private SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
private AdminClient adminClient;

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private MockMvc mockMvc;

@BeforeAll
void beforeAll() {
    eventHubHealthCheck = new EventHubHealthCheck(meterRegistry);
    Properties props = new Properties();
    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    adminClient = AdminClient.create(props);
}

@AfterAll
void tearDown() {
    if (adminClient != null) {
        adminClient.close();
    }
}

@Test
void shouldReportEventHubStatusDownWhenKafkaIsStopped() throws Exception {
    embeddedKafka.destroy();
    
    Thread.sleep(60000);

    mockMvc.perform(get("/actuator/prometheus"))
            .andExpect(status().isOk())
            .andExpect(content().string(containsString("event_hub_connectivity_status 0.0")));
}
发布评论

评论列表(0)

  1. 暂无评论