I'm trying to write a health check for kafka, that would run periodically, check connectivity and report any problems through prometheus alerts. My idea on how to do it was to simply make a request to kafka in a try catch block, and treat any exception as lost connectivity, code below.
@Scheduled(fixedRate = 1000)
private void checkEventHubConnection() {
try {
System.out.println("Checking eventHub connection...");
adminClient.listTopics().names().get();
eventHubStatus = 1;
} catch (ExecutionException ee) {
eventHubStatusDown(ee);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
eventHubStatusDown(ie);
}
}
My problem is with the unit tests. Checking the positive case is simple enough and works. However, when I want to disable the EmbeddedKafka, it gets stuck in a loop, trying to reconnect for 60 seconds. What I find even weirder, is that during these 60 seconds of reconnecting, my scheduled task checkEventHubConnection
doesn't run at all. It only runs once at the beginning, and once at the end, after the 60 seconds. I'm suspecting it's because the retry operation locks the thread where the scheduled task runs?
Any opinions on if my general approach is correct and if so, how can I fix that? Below is my test setup and the test itself.
public class TestTest {
private EventHubHealthCheck eventHubHealthCheck;
private SimpleMeterRegistry meterRegistry = new SimpleMeterRegistry();
private AdminClient adminClient;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private MockMvc mockMvc;
@BeforeAll
void beforeAll() {
eventHubHealthCheck = new EventHubHealthCheck(meterRegistry);
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
adminClient = AdminClient.create(props);
}
@AfterAll
void tearDown() {
if (adminClient != null) {
adminClient.close();
}
}
@Test
void shouldReportEventHubStatusDownWhenKafkaIsStopped() throws Exception {
embeddedKafka.destroy();
Thread.sleep(60000);
mockMvc.perform(get("/actuator/prometheus"))
.andExpect(status().isOk())
.andExpect(content().string(containsString("event_hub_connectivity_status 0.0")));
}