1.
container.start();
Никогда start()
компоненты в определениях бинов - контекст приложения еще не готов;контейнер автоматически запустит контейнеры в нужное время (если autoStartup
верно (по умолчанию).
Зачем вам фабрика контейнеров, если вы сами создаете контейнеры?
Непонятно, что вы хотите проверить.
РЕДАКТИРОВАТЬ
Вот пример программной регистрации контейнеров с использованием автоматически сконфигурированной фабрики контейнеров Spring Boot (2.2 и выше) ...
@SpringBootApplication
public class So53752783Application {
public static void main(String[] args) {
SpringApplication.run(So53752783Application.class, args);
}
@SuppressWarnings("unchecked")
@Bean
public SmartInitializingSingleton creator(ConfigurableListableBeanFactory beanFactory,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
return () -> Stream.of("foo", "bar", "baz").forEach(topic -> {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener((MessageListener<String, String>) record -> {
System.out.println("Received " + record);
});
container.getContainerProperties().setGroupId(topic + ".group");
container = (ConcurrentMessageListenerContainer<String, String>)
beanFactory.initializeBean(container, topic + ".container");
beanFactory.registerSingleton(topic + ".container", container);
container.start();
});
}
}
Для модульного тестирования вашего слушателя,
container.getContainerProperties().getMessagelistener()
сотворите его и вызовите onMessage()
и убедитесь, что оно сделало то, что вы ожидали.
EDIT2 Unit Тестирование слушателя
@SpringBootApplication
public class So53752783Application {
public static void main(String[] args) {
SpringApplication.run(So53752783Application.class, args);
}
@SuppressWarnings("unchecked")
@Bean
public SmartInitializingSingleton creator(ConfigurableListableBeanFactory beanFactory,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
MyListener listener) {
return () -> Stream.of("foo", "bar", "baz").forEach(topic -> {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(listener);
container.getContainerProperties().setGroupId(topic + ".group");
container = (ConcurrentMessageListenerContainer<String, String>)
beanFactory.initializeBean(container, topic + ".container");
beanFactory.registerSingleton(topic + ".container", container);
container.start();
});
}
@Bean
public MyListener listener() {
return new MyListener();
}
public static class MyListener implements MessageListener<String, String> {
@Autowired
private Service service;
public void setService(Service service) {
this.service = service;
}
@Override
public void onMessage(ConsumerRecord<String, String> data) {
this.service.callSomeService(data.value().toUpperCase());
}
}
public interface Service {
void callSomeService(String in);
}
@Component
public static class DefaultService implements Service {
@Override
public void callSomeService(String in) {
// ...
}
}
}
и
@RunWith(SpringRunner.class)
@SpringBootTest
public class So53752783ApplicationTests {
@Autowired
private ApplicationContext context;
@Test
public void test() {
ConcurrentMessageListenerContainer<?, ?> container = context.getBean("foo.container",
ConcurrentMessageListenerContainer.class);
MyListener messageListener = (MyListener) container.getContainerProperties().getMessageListener();
Service service = mock(Service.class);
messageListener.setService(service);
messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, "key", "foo"));
verify(service).callSomeService("FOO");
}
}