Кафка Спринг: Как написать модульные тесты для ConcurrentKafkaListenerContainerFactory и ConcurrentMessageListenerContainer? - PullRequest
0 голосов
/ 13 декабря 2018

у меня 2 класса;1 для фабрик и другой для контейнеров слушателей:

public class ConsumerFactories() {
@Bean
  public ConcurrentKafkaListenerContainerFactory<String, Byte[]> adeKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Byte[]> factory = null;
    factory = new ConcurrentKafkaListenerContainerFactory<String, Byte[]>();
    factory.setConsumerFactory(consumerFactory1());
    factory.setConsumerFactory(consumerFactory2());
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }
}

И мой класс слушателей имеет несколько контейнеров:

@Bean
  public ConcurrentMessageListenerContainer<String, byte[]> adeListenerContainer() throws BeansException, ClassNotFoundException {
    final ContainerProperties containerProperties =
        new ContainerProperties("topic1");
    containerProperties.setMessageListener(new MessageListener<String, byte[]>() {
      @Override
      public void onMessage(ConsumerRecord<String, byte[]> record) {
        System.out.println("Thread is: " + Thread.currentThread().getName());
      }
    });

    ConcurrentMessageListenerContainer<String, byte[]> container =
        new ConcurrentMessageListenerContainer<>(consumerFactory1, containerProperties);
    container.setBeanName("bean1");
    container.setConcurrency(60);
    container.start();
    return container;
  }


@Bean
  public ConcurrentMessageListenerContainer<String, byte[]> adeListenerContainer() throws BeansException, ClassNotFoundException {
    final ContainerProperties containerProperties =
        new ContainerProperties("topic1");
    containerProperties.setMessageListener(new MessageListener<String, byte[]>() {
      @Override
      public void onMessage(ConsumerRecord<String, byte[]> record) {
        System.out.println("Thread is: " + Thread.currentThread().getName());
      }
    });

    ConcurrentMessageListenerContainer<String, byte[]> container =
        new ConcurrentMessageListenerContainer<>(consumerFactory2, containerProperties);
    container.setBeanName("bean2");
    container.setConcurrency(60);
    container.start();
    return container;
  }

1) Как я могу написать модульные тесты для этих 2 классов иметоды?

2) Поскольку все мои контейнеры слушателей выполняют одну и ту же обработку, но для другого набора тем, могу ли я пропустить темы, когда я устанавливаю customerFactory или каким-либо другим способом?

1 Ответ

0 голосов
/ 13 декабря 2018

1.

container.start();

Никогда start() компоненты в определениях бинов - контекст приложения еще не готов;контейнер автоматически запустит контейнеры в нужное время (если autoStartup верно (по умолчанию).

Зачем вам фабрика контейнеров, если вы сами создаете контейнеры?

Непонятно, что вы хотите проверить.

РЕДАКТИРОВАТЬ

Вот пример программной регистрации контейнеров с использованием автоматически сконфигурированной фабрики контейнеров Spring Boot (2.2 и выше) ...

@SpringBootApplication
public class So53752783Application {

    public static void main(String[] args) {
        SpringApplication.run(So53752783Application.class, args);
    }

    @SuppressWarnings("unchecked")
    @Bean
    public SmartInitializingSingleton creator(ConfigurableListableBeanFactory beanFactory,
            ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        return () -> Stream.of("foo", "bar", "baz").forEach(topic -> {
            ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
            container.getContainerProperties().setMessageListener((MessageListener<String, String>) record -> {
                System.out.println("Received " + record);
            });
            container.getContainerProperties().setGroupId(topic + ".group");
            container = (ConcurrentMessageListenerContainer<String, String>)
                    beanFactory.initializeBean(container, topic + ".container");
            beanFactory.registerSingleton(topic + ".container", container);
            container.start();
        });
    }

}

Для модульного тестирования вашего слушателя,

container.getContainerProperties().getMessagelistener()

сотворите его и вызовите onMessage() и убедитесь, что оно сделало то, что вы ожидали.

EDIT2 Unit Тестирование слушателя

@SpringBootApplication
public class So53752783Application {

    public static void main(String[] args) {
        SpringApplication.run(So53752783Application.class, args);
    }

    @SuppressWarnings("unchecked")
    @Bean
    public SmartInitializingSingleton creator(ConfigurableListableBeanFactory beanFactory,
            ConcurrentKafkaListenerContainerFactory<String, String> factory,
            MyListener listener) {

        return () -> Stream.of("foo", "bar", "baz").forEach(topic -> {
            ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
            container.getContainerProperties().setMessageListener(listener);
            container.getContainerProperties().setGroupId(topic + ".group");
            container = (ConcurrentMessageListenerContainer<String, String>)
                    beanFactory.initializeBean(container, topic + ".container");
            beanFactory.registerSingleton(topic + ".container", container);
            container.start();
        });
    }

    @Bean
    public MyListener listener() {
        return new MyListener();
    }

    public static class MyListener implements MessageListener<String, String> {

        @Autowired
        private Service service;

        public void setService(Service service) {
            this.service = service;
        }


        @Override
        public void onMessage(ConsumerRecord<String, String> data) {
            this.service.callSomeService(data.value().toUpperCase());
        }

    }

    public interface Service {

        void callSomeService(String in);

    }

    @Component
    public static class DefaultService implements Service {

        @Override
        public void callSomeService(String in) {
            // ...
        }

    }

 }

и

@RunWith(SpringRunner.class)
@SpringBootTest
public class So53752783ApplicationTests {

    @Autowired
    private ApplicationContext context;

    @Test
    public void test() {
        ConcurrentMessageListenerContainer<?, ?> container = context.getBean("foo.container",
                ConcurrentMessageListenerContainer.class);
        MyListener messageListener = (MyListener) container.getContainerProperties().getMessageListener();
        Service service = mock(Service.class);
        messageListener.setService(service);
        messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, "key", "foo"));
        verify(service).callSomeService("FOO");
    }

}
...