I created a Spring Boot application that uses Kafka, which I deployed on a Kubernetes cluster.
I am facing an error stating that the deployed Spring Boot application cannot resolve the bootstrap URLs inside the Kafka cluster.
I got this error when I tried to deploy my Spring Boot application:
rg.springframework.context.ApplicationContextException: Failed to start bean '.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
at .springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:326) ~[spring-context-6.2.3.jar!/:6.2.3]
at .springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:510) ~[spring-context-6.2.3.jar!/:6.2.3]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at .springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:295) ~[spring-context-6.2.3.jar!/:6.2.3]
at .springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:240) ~[spring-context-6.2.3.jar!/:6.2.3]
at .springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:1006) ~[spring-context-6.2.3.jar!/:6.2.3]
at .springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:630) ~[spring-context-6.2.3.jar!/:6.2.3]
at .springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.4.3.jar!/:3.4.3]
at .springframework.boot.SpringApplication.refresh(SpringApplication.java:752) ~[spring-boot-3.4.3.jar!/:3.4.3]
at .springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439) ~[spring-boot-3.4.3.jar!/:3.4.3]
at .springframework.boot.SpringApplication.run(SpringApplication.java:318) ~[spring-boot-3.4.3.jar!/:3.4.3]
at .springframework.boot.SpringApplication.run(SpringApplication.java:1361) ~[spring-boot-3.4.3.jar!/:3.4.3]
at .springframework.boot.SpringApplication.run(SpringApplication.java:1350) ~[spring-boot-3.4.3.jar!/:3.4.3]
at fr.formationacademy.scpiinvestpluspartner.ScpiInvestPlusPartnerApplication.main(ScpiInvestPlusPartnerApplication.java:10) ~[!/:0.0.1-SNAPSHOT]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[na:na]
at .springframework.boot.loader.launch.Launcher.launch(Launcher.java:102) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]
at .springframework.boot.loader.launch.Launcher.launch(Launcher.java:64) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]
at .springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:40) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]
Caused by: .apache.kafkamon.KafkaException: Failed to construct kafka consumer
at .apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:265) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:600) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:595) ~[kafka-clients-3.8.1.jar!/:na]
at .springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer.<init>(DefaultKafkaConsumerFactory.java:498) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:453) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:430) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:407) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:374) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:335) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:876) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:387) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:264) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:436) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:382) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:323) ~[spring-context-6.2.3.jar!/:6.2.3]
... 20 common frames omitted
Caused by: .apache.kafkamon.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:103) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:62) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:184) ~[kafka-clients-3.8.1.jar!/:na]
... 37 common frames omitted
I configured Kafka inside Kubernetes using this configuration:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: kafka-headless
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:6.1.1
ports:
- containerPort: 9092
- containerPort: 9094
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_LISTENERS
value: "INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094"
- name: KAFKA_ADVERTISED_LISTENERS
value: "INTERNAL://$(POD_NAME).kafka-headless:9092,OUTSIDE://$(POD_NAME).kafka-svc:9094"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INTERNAL"
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper:2181"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "false"
---
apiVersion: v1
kind: Service
metadata:
name: kafka-headless
spec:
clusterIP: None
selector:
app: kafka
ports:
- name: internal-port
protocol: TCP
port: 9092
targetPort: 9092
- name: outside-port
protocol: TCP
port: 9094
targetPort: 9094
---
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
spec:
selector:
app: kafka
ports:
- name: internal-port
protocol: TCP
port: 9092
targetPort: 9092
- name: outside-port
protocol: TCP
port: 9094
targetPort: 9094
type: ClusterIP
And for zookeeper:
apiVersion: apps/v1
kind: Deployment
metadata:
name: zookeeper
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: wurstmeister/zookeeper
ports:
- containerPort: 2181
---
apiVersion: v1
kind: Service
metadata:
name: zookeeper
spec:
selector:
app: zookeeper
ports:
- protocol: TCP
port: 2181
targetPort: 2181
My application.yml
:
spring:
application:
version: 1.0.0
name: scpi-invest-plus-partner
kafka:
bootstrap-servers: kafka-0.kafka-headless:9092,kafka-1.kafka-headless:9092,kafka-2.kafka-headless:9092
producer:
key-serializer: .apache.kafkamon.serialization.StringSerializer
value-serializer: .springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: scpi-partner-group
key-deserializer: .apache.kafkamon.serialization.StringDeserializer
value-deserializer: .apache.kafkamon.serialization.StringDeserializer
properties:
spring.json.trusted.packages: "*"
I created a Spring Boot application that uses Kafka, which I deployed on a Kubernetes cluster.
I am facing an error stating that the deployed Spring Boot application cannot resolve the bootstrap URLs inside the Kafka cluster.
I got this error when I tried to deploy my Spring Boot application:
rg.springframework.context.ApplicationContextException: Failed to start bean '.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
at .springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:326) ~[spring-context-6.2.3.jar!/:6.2.3]
at .springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:510) ~[spring-context-6.2.3.jar!/:6.2.3]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at .springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:295) ~[spring-context-6.2.3.jar!/:6.2.3]
at .springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:240) ~[spring-context-6.2.3.jar!/:6.2.3]
at .springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:1006) ~[spring-context-6.2.3.jar!/:6.2.3]
at .springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:630) ~[spring-context-6.2.3.jar!/:6.2.3]
at .springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:146) ~[spring-boot-3.4.3.jar!/:3.4.3]
at .springframework.boot.SpringApplication.refresh(SpringApplication.java:752) ~[spring-boot-3.4.3.jar!/:3.4.3]
at .springframework.boot.SpringApplication.refreshContext(SpringApplication.java:439) ~[spring-boot-3.4.3.jar!/:3.4.3]
at .springframework.boot.SpringApplication.run(SpringApplication.java:318) ~[spring-boot-3.4.3.jar!/:3.4.3]
at .springframework.boot.SpringApplication.run(SpringApplication.java:1361) ~[spring-boot-3.4.3.jar!/:3.4.3]
at .springframework.boot.SpringApplication.run(SpringApplication.java:1350) ~[spring-boot-3.4.3.jar!/:3.4.3]
at fr.formationacademy.scpiinvestpluspartner.ScpiInvestPlusPartnerApplication.main(ScpiInvestPlusPartnerApplication.java:10) ~[!/:0.0.1-SNAPSHOT]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:569) ~[na:na]
at .springframework.boot.loader.launch.Launcher.launch(Launcher.java:102) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]
at .springframework.boot.loader.launch.Launcher.launch(Launcher.java:64) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]
at .springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:40) ~[scpi-invest-plus-partner.jar:0.0.1-SNAPSHOT]
Caused by: .apache.kafkamon.KafkaException: Failed to construct kafka consumer
at .apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:265) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.consumer.internals.ConsumerDelegateCreator.create(ConsumerDelegateCreator.java:65) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:600) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:595) ~[kafka-clients-3.8.1.jar!/:na]
at .springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer.<init>(DefaultKafkaConsumerFactory.java:498) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:453) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:430) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:407) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:374) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:335) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:876) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:387) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:264) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:520) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:436) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:382) ~[spring-kafka-3.3.3.jar!/:3.3.3]
at .springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:323) ~[spring-context-6.2.3.jar!/:6.2.3]
... 20 common frames omitted
Caused by: .apache.kafkamon.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:103) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:62) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:58) ~[kafka-clients-3.8.1.jar!/:na]
at .apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.<init>(LegacyKafkaConsumer.java:184) ~[kafka-clients-3.8.1.jar!/:na]
... 37 common frames omitted
I configured Kafka inside Kubernetes using this configuration:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
spec:
serviceName: kafka-headless
replicas: 3
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: confluentinc/cp-kafka:6.1.1
ports:
- containerPort: 9092
- containerPort: 9094
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: KAFKA_LISTENERS
value: "INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094"
- name: KAFKA_ADVERTISED_LISTENERS
value: "INTERNAL://$(POD_NAME).kafka-headless:9092,OUTSIDE://$(POD_NAME).kafka-svc:9094"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INTERNAL"
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper:2181"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "false"
---
apiVersion: v1
kind: Service
metadata:
name: kafka-headless
spec:
clusterIP: None
selector:
app: kafka
ports:
- name: internal-port
protocol: TCP
port: 9092
targetPort: 9092
- name: outside-port
protocol: TCP
port: 9094
targetPort: 9094
---
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
spec:
selector:
app: kafka
ports:
- name: internal-port
protocol: TCP
port: 9092
targetPort: 9092
- name: outside-port
protocol: TCP
port: 9094
targetPort: 9094
type: ClusterIP
And for zookeeper:
apiVersion: apps/v1
kind: Deployment
metadata:
name: zookeeper
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: wurstmeister/zookeeper
ports:
- containerPort: 2181
---
apiVersion: v1
kind: Service
metadata:
name: zookeeper
spec:
selector:
app: zookeeper
ports:
- protocol: TCP
port: 2181
targetPort: 2181
My application.yml
:
spring:
application:
version: 1.0.0
name: scpi-invest-plus-partner
kafka:
bootstrap-servers: kafka-0.kafka-headless:9092,kafka-1.kafka-headless:9092,kafka-2.kafka-headless:9092
producer:
key-serializer: .apache.kafkamon.serialization.StringSerializer
value-serializer: .springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: scpi-partner-group
key-deserializer: .apache.kafkamon.serialization.StringDeserializer
value-deserializer: .apache.kafkamon.serialization.StringDeserializer
properties:
spring.json.trusted.packages: "*"
Share
Improve this question
asked Mar 18 at 1:27
SpeedskillsxSpeedskillsx
113 bronze badges
2 Answers
Reset to default 1i can see two problems in you application config.
beacuse you have headless svc you can just pass the service name and k8s dns will help you to resolveit
the client communication port as i see is 9094 not 9092 wich is internal brokers communication.
so the right config can look like:
spring:
kafka:
bootstrap-servers: kafka-svc:9094
also if the application and the kafka brokers are not in the same namespace use
spring:
kafka:
bootstrap-servers: kafka-svc.namespace-name.svc:9094
After few modifications, I could resolve the error, but my topics are not recongnized by the application.
2025-03-18 15:37:45 [scpi-invest-plus-api] [int] [.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-scpi-partner-group-1, groupId=scpi-partner-group] Error while fetching metadata with correlation id 124 : {scpi-partner-response-topic=UNKNOWN_TOPIC_OR_PARTITION}
This is my configuration :
spring:
application:
name: scpi-invest-plus-api
version: 1.0.0
datasource:
url: jdbc:postgresql://scpi-invest-db:5432/postgres
username: postgres
password: postgres
driver-class-name: .postgresql.Driver
jpa:
database: postgresql
hibernate:
ddl-auto: validate
properties:
hibernate:
dialect: .hibernate.dialect.PostgreSQLDialect
kafka:
bootstrap-servers: kafka-headless.kafka:9092
producer:
key-serializer: .apache.kafkamon.serialization.StringSerializer
value-serializer: .springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: scpi-partner-group
key-deserializer: .apache.kafkamon.serialization.StringDeserializer
value-deserializer: .springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
security:
oauth2:
resourceserver:
jwt:
issuer-uri: https://keycloak.check-consulting/realms/master
jwk-set-uri: https://keycloak.check-consulting/realms/master/protocol/openid-connect/certs
management:
endpoints:
web:
exposure:
include: health,prometheus
endpoint:
prometheus:
enabled: true
prometheus:
metrics:
export:
enabled: true
The method where I send the message using Kafka:
public InvestmentDto saveInvestment(InvestmentDto investmentDto) throws GlobalException {
log.info("Début de la création d'un investissement.");
if (investmentDto == null) {
log.error("L'objet InvestmentDto est null.");
throw new GlobalException(HttpStatus.BAD_REQUEST, "InvestmentDto ne peut pas être null.");
}
if (investmentDto.getScpiId() == null) {
log.error("L'ID de la SCPI est null.");
throw new GlobalException(HttpStatus.BAD_REQUEST, "L'ID de la SCPI ne peut pas être null.");
}
String email = userService.getEmail();
log.info("Récupération de l'email de l'utilisateur : {}", email);
ScpiDtoOut scpiDtoOut = scpiService.getScpiDetailsById(investmentDto.getScpiId());
log.info("Détails SCPI récupérés : {}", scpiDtoOut);
if (scpiDtoOut == null) {
log.error("SCPI non trouvée pour ID: {}", investmentDto.getScpiId());
throw new GlobalException(HttpStatus.NOT_FOUND, "Aucune SCPI trouvée avec l'ID: " + investmentDto.getScpiId());
}
log.info("SCPI trouvée : {} - {}", scpiDtoOut.getId(), scpiDtoOut.getName());
Scpi scpiEntity = scpiRepository.findById(investmentDto.getScpiId())
.orElseThrow(() -> new GlobalException(HttpStatus.NOT_FOUND, "SCPI non trouvée"));
Investment investment = investmentMapper.toEntity(investmentDto);
investment.setInvestorId(email);
investment.setInvestmentState("En cours");
investment.setScpi(scpiEntity);
Investment savedInvestment = investmentRepository.save(investment);
log.info("Investissement enregistré avec succès - ID: {}", savedInvestment.getId());
InvestmentKafkaDto kafkaDto = new InvestmentKafkaDto();
InvestmentOutDto investmentOutDto = investmentMapper.toOutDto(savedInvestment);
investmentOutDto.setId(savedInvestment.getId());
kafkaDto.setInvestmentDto(investmentOutDto);
kafkaDto.setInvestorEmail(email);
kafkaDto.setScpi(scpiDtoOut);
log.info("Envoi la demande d'investissement au Bouchon pour Objet Traitement : {}", kafkaDto);
sendInvestment(kafkaDto);
log.info("Investissement envoyé avec succès à Kafka - ID: {}", savedInvestment.getId());
return investmentMapper.toDTO(savedInvestment);
}
I also configured the topic :
import static fr.formationacademy.scpiinvestplusapi.utils.Constants.SCPI_REQUEST_TOPIC;
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic getTopic() {
return TopicBuilder.name(SCPI_REQUEST_TOPIC)
.partitions(1)
.replicas(1)
.build();
}
}