У меня есть потребитель Kafka, который использует аннотацию @KafkaListener
. Чтобы написать тестовые примеры с использованием встроенного сервера Kafka, я выполнил следующий код: Как написать модульный тест для @KafkaListener? . Предлагаемый подход в примере кода не работает. Единственная разница в моем коде заключается в том, что значение не является строкой. Итак, я настроил ProducerFactory соответственно с JsonSerializer
.
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("test")
@DirtiesContext
public class SpringKafkaReceiverTest {
private static String RECEIVER_TOPIC="messageRouterTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, RECEIVER_TOPIC);
@Mock
Processor processor;
@Autowired
private KafkaListenerEndpointRegistry registry;
private KafkaTemplate<String, Envelope> template;
@Bean
private ProducerFactory<String, Envelope> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getEmbeddedKafka().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Before
public void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
template = new KafkaTemplate<>(producerFactory());
MockitoAnnotations.initMocks(this);
}
@Test
public void test() throws Exception {
ConcurrentMessageListenerContainer<?, ?> container = (ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer(RECEIVER_TOPIC);
container.stop();
@SuppressWarnings("unchecked")
AcknowledgingConsumerAwareMessageListener<String, Envelope> messageListener = (AcknowledgingConsumerAwareMessageListener<String, Envelope>) container
.getContainerProperties().getMessageListener();
CountDownLatch latch = new CountDownLatch(1);
container.getContainerProperties().setMessageListener(new AcknowledgingConsumerAwareMessageListener<String, Envelope>(){
@Override
public void onMessage(ConsumerRecord<String, Envelope> data, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
messageListener.onMessage(data, acknowledgment, consumer);
latch.countDown();
}
});
container.start();
Envelope message = new Envelope();
message.setAction("sampleAction");
message.setValue("This is the object for testing purpose");
// These are the void methods
// Since these methods will be invoked when the listener method listens to the Kafka topic, it is not explicitly called here.
doNothing().when(processor).setCommand(any());
doNothing().when(processor).allocateEnvelopes(any());
template.send(RECEIVER_TOPIC, message);
verify(processor, Mockito.times(1)).allocateEnvelopes(any());
//assertTrue(latch.await(10, TimeUnit.SECONDS));
}
}
Класс, который загружает приложение:
@SpringBootApplication
public class PostOfficeApplication {
private static ApplicationContext appContext;
public static void main(String[] args) {
appContext = SpringApplication.run(PostOfficeApplication.class, args);
}
public static ApplicationContext getApplicationContext() {
return appContext;
}
}
Код слушателя Kafka выглядит следующим образом:
@Bean
public ConsumerFactory<String, Envelope> consumerFactory(List<String> consumerBootstrapServers) {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerBootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "postOfficeGrp");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(Envelope.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Envelope> kafkaListenerFactory(ConsumerFactory<String, Envelope> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Envelope> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
}
@Service
public class Mailbox {
@Autowired
Processor processor;
@KafkaListener(id="messageRouterTopic" , topics = "${app.topic}", groupId = "postOfficeGrp", containerFactory = "kafkaListenerFactory")
public void processKafkaMessages(Envelope object) {
processor.setCommand(PostOfficeApplication.getApplicationContext().getBean(object.getAction(), Command.class));
processor.allocateEnvelopes(object);
}
}
И когда я запускаю mvn test
, я вижу следующую ошибку в консоли:
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 6.975 s <<< FAILURE! - in com.walmartlabs.gioda.po.consumer.SpringKafkaReceiverTest
[ERROR] test Time elapsed: 0.246 s <<< ERROR!
java.lang.NullPointerException
at SpringKafkaReceiverTest.test(SpringKafkaReceiverTest.java:84)
[INFO]
[INFO] Results:
[INFO]
[ERROR] Errors:
[ERROR] SpringKafkaReceiverTest.test:84 NullPointer
[INFO]
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.286 s
[INFO] Finished at: 2020-08-07T10:07:09-05:00
[INFO] ------------------------------------------------------------------------
Каждый раз, когда я запускаю код, container.stop()
завершается с ошибкой NullPointerException. Как я могу это решить? Я что-нибудь упустил?