Для чистого модульного теста заводской конфигурации, где фильтр
record -> record.value().contains("foo")
, вы можете сделать что-то вроде этого ...
@SpringBootTest
class So63239177ApplicationTests {
@Autowired
ConcurrentKafkaListenerContainerFactory<String, String> factory;
private int called;
@Test
void testFilter() throws Exception {
MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setBean(this);
endpoint.setMethod(getClass().getDeclaredMethod("listen", ConsumerRecord.class));
endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
endpoint.setTopics("foo");
ConcurrentMessageListenerContainer<String, String> container = factory.createListenerContainer(endpoint);
@SuppressWarnings("unchecked")
MessageListener<String, String> messageListener =
(MessageListener<String, String>) container.getContainerProperties().getMessageListener();
messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, null, "foo"));
assertThat(this.called).isEqualTo(0);
messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, null, "bar"));
assertThat(this.called).isEqualTo(1);
}
public void listen(ConsumerRecord<String, String> in) {
this.called++;
}
}
EDIT
Вот несколько более простой (и, возможно, более чистый) способ сделать это ...
@SpringBootTest
class So632391771ApplicationTests {
@Autowired
KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp;
@Autowired
KafkaListenerEndpointRegistry registry;
@Test
void testFilter() throws Exception {
TestListener listener = new TestListener();
this.bpp.postProcessAfterInitialization(listener, "test");
ContainerProperties containerProperties = registry.getListenerContainer("test")
.getContainerProperties();
assertThat(containerProperties.getGroupId()).isEqualTo("test");
@SuppressWarnings("unchecked")
MessageListener<String, String> messageListener =
(MessageListener<String, String>) containerProperties.getMessageListener();
messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, null, "foo"));
assertThat(listener.called).isEqualTo(0);
messageListener.onMessage(new ConsumerRecord<>("foo", 0, 0L, null, "bar"));
assertThat(listener.called).isEqualTo(1);
}
}
class TestListener { // NOTE: Not a @Bean
int called;
@KafkaListener(id = "test", topics = "foo", autoStartup = "false")
public void listen(String in) {
this.called++;
}
}