comment_list.inc': $pre .= $default_pre .= 'comment_list.inc.htm'; break; case 'message': $pre .= $default_pre .= 'message.htm'; break; case 'tag_list': $pre .= $default_pre .= 'tag_list.htm'; break; case 'tag': $pre .= $default_pre .= 'tag.htm'; break; case 'flag': $pre .= $default_pre .= 'flag.htm'; break; case 'my': $pre .= $default_pre .= 'my.htm'; break; case 'my_password': $pre .= $default_pre .= 'my_password.htm'; break; case 'my_bind': $pre .= $default_pre .= 'my_bind.htm'; break; case 'my_avatar': $pre .= $default_pre .= 'my_avatar.htm'; break; case 'home_article': $pre .= $default_pre .= 'home_article.htm'; break; case 'home_comment': $pre .= $default_pre .= 'home_comment.htm'; break; case 'user': $pre .= $default_pre .= 'user.htm'; break; case 'user_login': $pre .= $default_pre .= 'user_login.htm'; break; case 'user_create': $pre .= $default_pre .= 'user_create.htm'; break; case 'user_resetpw': $pre .= $default_pre .= 'user_resetpw.htm'; break; case 'user_resetpw_complete': $pre .= $default_pre .= 'user_resetpw_complete.htm'; break; case 'user_comment': $pre .= $default_pre .= 'user_comment.htm'; break; case 'single_page': $pre .= $default_pre .= 'single_page.htm'; break; case 'search': $pre .= $default_pre .= 'search.htm'; break; case 'operate_sticky': $pre .= $default_pre .= 'operate_sticky.htm'; break; case 'operate_close': $pre .= $default_pre .= 'operate_close.htm'; break; case 'operate_delete': $pre .= $default_pre .= 'operate_delete.htm'; break; case 'operate_move': $pre .= $default_pre .= 'operate_move.htm'; break; case '404': $pre .= $default_pre .= '404.htm'; break; case 'read_404': $pre .= $default_pre .= 'read_404.htm'; break; case 'list_404': $pre .= $default_pre .= 'list_404.htm'; break; default: $pre .= $default_pre .= theme_mode_pre(); break; } if ($config['theme']) { $conffile = APP_PATH . 'view/template/' . $config['theme'] . '/conf.json'; $json = is_file($conffile) ? xn_json_decode(file_get_contents($conffile)) : array(); } !empty($json['installed']) and $path_file = APP_PATH . 'view/template/' . $config['theme'] . '/htm/' . ($id ? $id . '_' : '') . $pre; (empty($path_file) || !is_file($path_file)) and $path_file = APP_PATH . 'view/template/' . $config['theme'] . '/htm/' . $pre; if (!empty($config['theme_child']) && is_array($config['theme_child'])) { foreach ($config['theme_child'] as $theme) { if (empty($theme) || is_array($theme)) continue; $path_file = APP_PATH . 'view/template/' . $theme . '/htm/' . ($id ? $id . '_' : '') . $pre; !is_file($path_file) and $path_file = APP_PATH . 'view/template/' . $theme . '/htm/' . $pre; } } !is_file($path_file) and $path_file = APP_PATH . ($dir ? 'plugin/' . $dir . '/view/htm/' : 'view/htm/') . $default_pre; return $path_file; } function theme_mode_pre($type = 0) { global $config; $mode = $config['setting']['website_mode']; $pre = ''; if (1 == $mode) { $pre .= 2 == $type ? 'portal_category.htm' : 'portal.htm'; } elseif (2 == $mode) { $pre .= 2 == $type ? 'flat_category.htm' : 'flat.htm'; } else { $pre .= 2 == $type ? 'index_category.htm' : 'index.htm'; } return $pre; } ?>spring boot - EmbeddedKafkaBroker using fixed port - Stack Overflow
最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

spring boot - EmbeddedKafkaBroker using fixed port - Stack Overflow

programmeradmin0浏览0评论

I need to use EmbeddedKafkaBroker for Integration Testing where I am not using Spring-Kafka templates in my spring-boot app. I am using Apache kafka-clients jar as a dependency. My configuration is,

@Service
public class MyPublishService {

@Autowired
MyEventsProducer eventProducer; //which is wrapper on KakfaProducer and AdminClient

@PostConstruct
void  init(){ 
eventProducer.createTopic("myTopic");//calls AdminClient to create topic
}

public void publish(String topic, Object payload) {//which call sends payload to kafka
 eventProducer.send(topic, payload);
}

}//end of service
@Configuration
class MyKafkaConfig {
  String server; //which I expect to be localhost:9092 in integration tests
  @Bean
  public MyEventsProducer myEventsProducer(){
   if(localEnv()) { //tells if setup is local kafka
    server = "localhost:9092";
   }
  return new MyEventsProducer(server);
 }
}

Now my IntegrationTest set up is,

@ExtendWith(SpringExtension.class)
@SpringBootTest(
    classes = Application.class,
    webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT
)
@ActiveProfiles("test")
@AutoConfigureMockMvc
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class MyPublishServiceTest{

    @Autowired
    MyPublishService myPublishService;

    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    public void publishMessageTest() {
        System.out.println(embeddedKafkaBroker.getBrokersAsString());       
        myPublishService.publishCollaboration(collaboration);
    }
}

I have 2 questions,

  1. Why do I always get random port for embeddedKafkaBroker.getBrokersAsString() I also tried to set spring.kafka.bootstrap-servers: localhost:9092 in application-test.yaml but it did not help. What should I do to get a fixed host:port?
  2. I am creating topic in @PostConstruct of class MyPublishService. How do I make sure Embedded kafka will be up before code reaches this point and do not cause TimeOutException since Embedded kafka may not be up.

Note: Spring-boot-version: 3.3.2 Spring-kafka-test:3.1.1 .testcontainers:kafka:1.19.3

I need to use EmbeddedKafkaBroker for Integration Testing where I am not using Spring-Kafka templates in my spring-boot app. I am using Apache kafka-clients jar as a dependency. My configuration is,

@Service
public class MyPublishService {

@Autowired
MyEventsProducer eventProducer; //which is wrapper on KakfaProducer and AdminClient

@PostConstruct
void  init(){ 
eventProducer.createTopic("myTopic");//calls AdminClient to create topic
}

public void publish(String topic, Object payload) {//which call sends payload to kafka
 eventProducer.send(topic, payload);
}

}//end of service
@Configuration
class MyKafkaConfig {
  String server; //which I expect to be localhost:9092 in integration tests
  @Bean
  public MyEventsProducer myEventsProducer(){
   if(localEnv()) { //tells if setup is local kafka
    server = "localhost:9092";
   }
  return new MyEventsProducer(server);
 }
}

Now my IntegrationTest set up is,

@ExtendWith(SpringExtension.class)
@SpringBootTest(
    classes = Application.class,
    webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT
)
@ActiveProfiles("test")
@AutoConfigureMockMvc
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
public class MyPublishServiceTest{

    @Autowired
    MyPublishService myPublishService;

    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    public void publishMessageTest() {
        System.out.println(embeddedKafkaBroker.getBrokersAsString());       
        myPublishService.publishCollaboration(collaboration);
    }
}

I have 2 questions,

  1. Why do I always get random port for embeddedKafkaBroker.getBrokersAsString() I also tried to set spring.kafka.bootstrap-servers: localhost:9092 in application-test.yaml but it did not help. What should I do to get a fixed host:port?
  2. I am creating topic in @PostConstruct of class MyPublishService. How do I make sure Embedded kafka will be up before code reaches this point and do not cause TimeOutException since Embedded kafka may not be up.

Note: Spring-boot-version: 3.3.2 Spring-kafka-test:3.1.1 .testcontainers:kafka:1.19.3

Share Improve this question edited Mar 12 at 9:34 user2206366 asked Mar 12 at 9:29 user2206366user2206366 4813 gold badges7 silver badges17 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

See @EmbeddedKafka JavaDocs:

/**
 * Set explicit ports on which the kafka brokers will listen. Useful when running an
 * embedded broker that you want to access from other processes.
 * A port must be provided for each instance, which means the number of ports must match the value of the count attribute.
 * This property is not valid when using KRaft mode.
 * @return ports for brokers.
 * @since 2.2.4
 */
int[] ports() default { 0 };

That brokerProperties.port i not involved in the Broker instance creation.

发布评论

评论列表(0)

  1. 暂无评论