Нужна помощь | UnitTest | Пользовательский ConcurrentKafkaListenerContainerFactory с настраиваемой записью FilterStrategy - PullRequest
0 голосов
/ 04 августа 2020

Попытка выполнить модульное тестирование, ConcurrentKafkaListenerContainerFactory с настраиваемой RecordFilterStrategy, однако не удалось найти оптимальный подход для проверки стратегии фильтрации.

  class ConsumerConfiguration {

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Message> messageListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Message> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(consumerRecord -> consumerRecord.value().getType().contains("MYFILTER"));
    return factory;
  }

}

Модульный тест

  @Mock
  KafkaProperties kafkaProperties;
  ConsumerConfiguration configuration;
  @Test
  void testMessageListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Message> actual =
        configuration.messageListenerContainerFactory();
    assertEquals(kafkaProperties.buildConsumerProperties(),
        actual.getContainerProperties().getKafkaConsumerProperties());
  }

Хотя этот код хорош для модульного тестирования, однако он не обеспечивает покрытия кода для Custom RecordFilterStrategy, то есть лямбда-функции, поэтому нужна помощь, если кто-то уже выполнил unittest / покрытие кода для Kafka Listener Container Factory и какой подход лучше всего подходит для обработка.

1 Ответ

0 голосов
/ 04 августа 2020

Для чистого модульного теста заводской конфигурации, где фильтр

record -> record.value().contains("foo")

, вы можете сделать что-то вроде этого ...

@SpringBootTest
class So63239177ApplicationTests {

    @Autowired
    ConcurrentKafkaListenerContainerFactory<String, String> factory;

    private int called;

    @Test
    void testFilter() throws Exception {
        MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
        endpoint.setBean(this);
        endpoint.setMethod(getClass().getDeclaredMethod("listen", ConsumerRecord.class));
        endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
        endpoint.setTopics("foo");
        ConcurrentMessageListenerContainer<String, String> container = factory.createListenerContainer(endpoint);
        @SuppressWarnings("unchecked")
        MessageListener<String, String> messageListener =
                (MessageListener<String, String>) container.getContainerProperties().getMessageListener();
        messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, null, "foo"));
        assertThat(this.called).isEqualTo(0);
        messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, null, "bar"));
        assertThat(this.called).isEqualTo(1);
    }

    public void listen(ConsumerRecord<String, String> in) {
        this.called++;
    }

}

EDIT

Вот несколько более простой (и, возможно, более чистый) способ сделать это ...

@SpringBootTest
class So632391771ApplicationTests {

    @Autowired
    KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp;

    @Autowired
    KafkaListenerEndpointRegistry registry;

    @Test
    void testFilter() throws Exception {
        TestListener listener = new TestListener();
        this.bpp.postProcessAfterInitialization(listener, "test");
        ContainerProperties containerProperties = registry.getListenerContainer("test")
                .getContainerProperties();
        assertThat(containerProperties.getGroupId()).isEqualTo("test");
        @SuppressWarnings("unchecked")
        MessageListener<String, String> messageListener =
                    (MessageListener<String, String>) containerProperties.getMessageListener();
        messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, null, "foo"));
        assertThat(listener.called).isEqualTo(0);
        messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, null, "bar"));
        assertThat(listener.called).isEqualTo(1);
    }

}

class TestListener { // NOTE: Not a @Bean

    int called;

    @KafkaListener(id = "test", topics = "foo", autoStartup = "false")
    public void listen(String in) {
        this.called++;
    }

}
...