最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

kubernetes - Cannot Connect my deployed kafka on Kubernetese with my spring boot application - Stack Overflow

programmeradmin4浏览0评论

I created a Spring Boot application that uses Kafka, which I deployed on a Kubernetes cluster.

I am facing an error stating that the deployed Spring Boot application cannot resolve the bootstrap URLs inside the Kafka cluster.

I got this error when I tried to deploy my Spring Boot application:

rg.springframework.context.ApplicationContextException: Failed to start bean '.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
    at .springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:326) ~[spring-context-6.2.3.jar!/:6.2.3]
    at .springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:510) ~[spring-context-6.2.3.jar!/:6.2.3]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at .springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:295) ~[spring-context-6.2.3.jar!/:6.2.3]
    at .springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:240) ~[spring-context-6.2.3.jar!/:6.2.3]
    at .springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:1006) ~[spring-context-6.2.3.jar!/:6.2.3]
    at .springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:630) ~[spring-context-6.2.3.jar!/:6.2.3]
    at .springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at .springframework.boot.SpringApplication.refresh(SpringApplication.java:752) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at .springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:318) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:1361) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:1350) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at fr.formationacademy.scpiinvestpluspartner.ScpiInvestPlusPartnerApplication.main(ScpiInvestPlusPartnerApplication.java:10) ~[!/:0.0.1-SNAPSHOT]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[na:na]
    at .springframework.boot.loader.launch.Launcher.launch(Launcher.java:102) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]
    at .springframework.boot.loader.launch.Launcher.launch(Launcher.java:64) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]
    at .springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:40) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]

Caused by: .apache.kafkamon.KafkaException: Failed to construct kafka consumer
    at .apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:265) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:600) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:595) ~[kafka-clients-3.8.1.jar!/:na]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer.<init>(DefaultKafkaConsumerFactory.java:498) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:453) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:430) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:407) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:374) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:335) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:876) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:387) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:264) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:436) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:382) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:323) ~[spring-context-6.2.3.jar!/:6.2.3]
    ... 20 common frames omitted

Caused by: .apache.kafkamon.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:103) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:62) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:184) ~[kafka-clients-3.8.1.jar!/:na]
    ... 37 common frames omitted

I configured Kafka inside Kubernetes using this configuration:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka-headless
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
        - name: kafka
          image: confluentinc/cp-kafka:6.1.1
          ports:
            - containerPort: 9092
            - containerPort: 9094
          env:
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name

            - name: KAFKA_LISTENERS
              value: "INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094"
            - name: KAFKA_ADVERTISED_LISTENERS
              value: "INTERNAL://$(POD_NAME).kafka-headless:9092,OUTSIDE://$(POD_NAME).kafka-svc:9094"
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: "INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT"
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "3"
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: "INTERNAL"
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: "zookeeper:2181"
            - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
              value: "false"

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-headless
spec:
  clusterIP: None
  selector:
    app: kafka
  ports:
    - name: internal-port
      protocol: TCP
      port: 9092
      targetPort: 9092
    - name: outside-port
      protocol: TCP
      port: 9094
      targetPort: 9094

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
spec:
  selector:
    app: kafka
  ports:
    - name: internal-port
      protocol: TCP
      port: 9092
      targetPort: 9092
    - name: outside-port
      protocol: TCP
      port: 9094
      targetPort: 9094
  type: ClusterIP

And for zookeeper:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: zookeeper
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - name: zookeeper
          image: wurstmeister/zookeeper
          ports:
            - containerPort: 2181
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper
spec:
  selector:
    app: zookeeper
  ports:
    - protocol: TCP
      port: 2181
      targetPort: 2181

My application.yml:

spring:
  application:
    version: 1.0.0
    name: scpi-invest-plus-partner
  kafka:
    bootstrap-servers: kafka-0.kafka-headless:9092,kafka-1.kafka-headless:9092,kafka-2.kafka-headless:9092
    producer:
      key-serializer: .apache.kafkamon.serialization.StringSerializer
      value-serializer: .springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: scpi-partner-group
      key-deserializer: .apache.kafkamon.serialization.StringDeserializer
      value-deserializer: .apache.kafkamon.serialization.StringDeserializer
      properties:
        spring.json.trusted.packages: "*"

I created a Spring Boot application that uses Kafka, which I deployed on a Kubernetes cluster.

I am facing an error stating that the deployed Spring Boot application cannot resolve the bootstrap URLs inside the Kafka cluster.

I got this error when I tried to deploy my Spring Boot application:

rg.springframework.context.ApplicationContextException: Failed to start bean '.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
    at .springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:326) ~[spring-context-6.2.3.jar!/:6.2.3]
    at .springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:510) ~[spring-context-6.2.3.jar!/:6.2.3]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at .springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:295) ~[spring-context-6.2.3.jar!/:6.2.3]
    at .springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:240) ~[spring-context-6.2.3.jar!/:6.2.3]
    at .springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:1006) ~[spring-context-6.2.3.jar!/:6.2.3]
    at .springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:630) ~[spring-context-6.2.3.jar!/:6.2.3]
    at .springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at .springframework.boot.SpringApplication.refresh(SpringApplication.java:752) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at .springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:318) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:1361) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at .springframework.boot.SpringApplication.run(SpringApplication.java:1350) ~[spring-boot-3.4.3.jar!/:3.4.3]
    at fr.formationacademy.scpiinvestpluspartner.ScpiInvestPlusPartnerApplication.main(ScpiInvestPlusPartnerApplication.java:10) ~[!/:0.0.1-SNAPSHOT]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[na:na]
    at .springframework.boot.loader.launch.Launcher.launch(Launcher.java:102) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]
    at .springframework.boot.loader.launch.Launcher.launch(Launcher.java:64) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]
    at .springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:40) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]

Caused by: .apache.kafkamon.KafkaException: Failed to construct kafka consumer
    at .apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:265) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:600) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:595) ~[kafka-clients-3.8.1.jar!/:na]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer.<init>(DefaultKafkaConsumerFactory.java:498) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:453) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:430) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:407) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:374) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:335) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:876) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:387) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:264) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:436) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:382) ~[spring-kafka-3.3.3.jar!/:3.3.3]
    at .springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:323) ~[spring-context-6.2.3.jar!/:6.2.3]
    ... 20 common frames omitted

Caused by: .apache.kafkamon.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:103) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:62) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58) ~[kafka-clients-3.8.1.jar!/:na]
    at .apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:184) ~[kafka-clients-3.8.1.jar!/:na]
    ... 37 common frames omitted

I configured Kafka inside Kubernetes using this configuration:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka-headless
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
        - name: kafka
          image: confluentinc/cp-kafka:6.1.1
          ports:
            - containerPort: 9092
            - containerPort: 9094
          env:
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name

            - name: KAFKA_LISTENERS
              value: "INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094"
            - name: KAFKA_ADVERTISED_LISTENERS
              value: "INTERNAL://$(POD_NAME).kafka-headless:9092,OUTSIDE://$(POD_NAME).kafka-svc:9094"
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: "INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT"
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "3"
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: "INTERNAL"
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: "zookeeper:2181"
            - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
              value: "false"

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-headless
spec:
  clusterIP: None
  selector:
    app: kafka
  ports:
    - name: internal-port
      protocol: TCP
      port: 9092
      targetPort: 9092
    - name: outside-port
      protocol: TCP
      port: 9094
      targetPort: 9094

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
spec:
  selector:
    app: kafka
  ports:
    - name: internal-port
      protocol: TCP
      port: 9092
      targetPort: 9092
    - name: outside-port
      protocol: TCP
      port: 9094
      targetPort: 9094
  type: ClusterIP

And for zookeeper:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: zookeeper
spec:
  replicas: 1
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
        - name: zookeeper
          image: wurstmeister/zookeeper
          ports:
            - containerPort: 2181
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper
spec:
  selector:
    app: zookeeper
  ports:
    - protocol: TCP
      port: 2181
      targetPort: 2181

My application.yml:

spring:
  application:
    version: 1.0.0
    name: scpi-invest-plus-partner
  kafka:
    bootstrap-servers: kafka-0.kafka-headless:9092,kafka-1.kafka-headless:9092,kafka-2.kafka-headless:9092
    producer:
      key-serializer: .apache.kafkamon.serialization.StringSerializer
      value-serializer: .springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: scpi-partner-group
      key-deserializer: .apache.kafkamon.serialization.StringDeserializer
      value-deserializer: .apache.kafkamon.serialization.StringDeserializer
      properties:
        spring.json.trusted.packages: "*"
Share Improve this question asked Mar 18 at 1:27 SpeedskillsxSpeedskillsx 113 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 1

i can see two problems in you application config.

  1. beacuse you have headless svc you can just pass the service name and k8s dns will help you to resolveit

  2. the client communication port as i see is 9094 not 9092 wich is internal brokers communication.

so the right config can look like:

spring:
  kafka:
    bootstrap-servers: kafka-svc:9094

also if the application and the kafka brokers are not in the same namespace use

spring:
  kafka:
    bootstrap-servers: kafka-svc.namespace-name.svc:9094

After few modifications, I could resolve the error, but my topics are not recongnized by the application.

2025-03-18 15:37:45 [scpi-invest-plus-api] [int] [.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN  o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-scpi-partner-group-1, groupId=scpi-partner-group] Error while fetching metadata with correlation id 124 : {scpi-partner-response-topic=UNKNOWN_TOPIC_OR_PARTITION}

This is my configuration :

spring:
  application:
    name: scpi-invest-plus-api
    version: 1.0.0

  datasource:
    url: jdbc:postgresql://scpi-invest-db:5432/postgres
    username: postgres
    password: postgres
    driver-class-name: .postgresql.Driver

  jpa:
    database: postgresql
    hibernate:
      ddl-auto: validate
    properties:
      hibernate:
        dialect: .hibernate.dialect.PostgreSQLDialect
  kafka:
    bootstrap-servers: kafka-headless.kafka:9092

    producer:
      key-serializer: .apache.kafkamon.serialization.StringSerializer
      value-serializer: .springframework.kafka.support.serializer.JsonSerializer
    consumer:
      group-id: scpi-partner-group
      key-deserializer: .apache.kafkamon.serialization.StringDeserializer
      value-deserializer: .springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

  security:
    oauth2:
      resourceserver:
        jwt:
          issuer-uri: https://keycloak.check-consulting/realms/master
          jwk-set-uri: https://keycloak.check-consulting/realms/master/protocol/openid-connect/certs


management:
  endpoints:
    web:
      exposure:
        include: health,prometheus
  endpoint:
    prometheus:
      enabled: true
  prometheus:
    metrics:
      export:
        enabled: true






The method where I send the message using Kafka:

public InvestmentDto saveInvestment(InvestmentDto investmentDto) throws GlobalException {
        log.info("Début de la création d'un investissement.");

        if (investmentDto == null) {
            log.error("L'objet InvestmentDto est null.");
            throw new GlobalException(HttpStatus.BAD_REQUEST, "InvestmentDto ne peut pas être null.");
        }

        if (investmentDto.getScpiId() == null) {
            log.error("L'ID de la SCPI est null.");
            throw new GlobalException(HttpStatus.BAD_REQUEST, "L'ID de la SCPI ne peut pas être null.");
        }

        String email = userService.getEmail();
        log.info("Récupération de l'email de l'utilisateur : {}", email);

        ScpiDtoOut scpiDtoOut = scpiService.getScpiDetailsById(investmentDto.getScpiId());
        log.info("Détails SCPI récupérés : {}", scpiDtoOut);
        if (scpiDtoOut == null) {
            log.error("SCPI non trouvée pour ID: {}", investmentDto.getScpiId());
            throw new GlobalException(HttpStatus.NOT_FOUND, "Aucune SCPI trouvée avec l'ID: " + investmentDto.getScpiId());
        }
        log.info("SCPI trouvée : {} - {}", scpiDtoOut.getId(), scpiDtoOut.getName());

        Scpi scpiEntity = scpiRepository.findById(investmentDto.getScpiId())
                .orElseThrow(() -> new GlobalException(HttpStatus.NOT_FOUND, "SCPI non trouvée"));

        Investment investment = investmentMapper.toEntity(investmentDto);
        investment.setInvestorId(email);
        investment.setInvestmentState("En cours");
        investment.setScpi(scpiEntity);
        
        Investment savedInvestment = investmentRepository.save(investment);
        log.info("Investissement enregistré avec succès - ID: {}", savedInvestment.getId());

        InvestmentKafkaDto kafkaDto = new InvestmentKafkaDto();
        InvestmentOutDto investmentOutDto = investmentMapper.toOutDto(savedInvestment);
        investmentOutDto.setId(savedInvestment.getId());
        kafkaDto.setInvestmentDto(investmentOutDto);
        kafkaDto.setInvestorEmail(email);
        kafkaDto.setScpi(scpiDtoOut);


        log.info("Envoi la demande d'investissement au Bouchon pour Objet Traitement : {}", kafkaDto);
        sendInvestment(kafkaDto);
        log.info("Investissement envoyé avec succès à Kafka - ID: {}", savedInvestment.getId());

        return investmentMapper.toDTO(savedInvestment);
    }

I also configured the topic :

import static fr.formationacademy.scpiinvestplusapi.utils.Constants.SCPI_REQUEST_TOPIC;

@Configuration
public class KafkaTopicConfig {
    @Bean
    public NewTopic getTopic() {
        return TopicBuilder.name(SCPI_REQUEST_TOPIC)
                .partitions(1)
                .replicas(1)
                .build();
    }
}

发布评论

评论列表(0)

  1. 暂无评论