Модульный тест Kafka Consumer, написанный с использованием Spring Kafka - PullRequest
0 голосов
/ 13 января 2020

Я создал потребительское приложение Kakfa (с использованием Spring Kafka), и оно работает нормально. Теперь я пытаюсь добавить блок тестовых случаев для того же. Мой потребитель является потребителем пакетного подтверждения, и он запускается внутри контейнера. Подробную информацию о потребителе можно найти в сообщении о переполнении стека ниже (которое я спрашиваю)

Свойство параллелизма Spring-Kafka

Я могу написать Тестовый пример для моего потребителя, проведя некоторые исследования, но здесь мне пришлось повторить создание контейнера и несколько других вещей, чтобы заставить его работать. Мне было интересно, есть ли какой-либо другой способ для тестовых случаев использовать тот же контейнер, который запускается во время запуска приложения (возможно, путем указания контекста теста), и потребитель, который запускается во время запуска, чтобы напрямую использовать встроенный экземпляр kafka.

Решение, которое я выяснил, приведено ниже, но я не уверен, что это правильный подход или нет.

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Launcher.class)
public class BatchConsumerTest {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "topic1");
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private KafkaTemplate<String, String> template;

    @MockBean
    private RestService restService;

    @Before
    public void setup() {
        Mockito.when(restService.invokeService("")).thenReturn("");
    }

    @Test
    public void test() throws Exception {
        ConcurrentMessageListenerContainer<?, ?> container = null;
        for (MessageListenerContainer con : registry.getAllListenerContainers()) {
            container = (ConcurrentMessageListenerContainer<?, ?>) con;
            container.stop();
        }
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group1", "false",
                embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<String, String>(
                consumerProps);
        ContainerProperties containerProps = new ContainerProperties("ibo-grocerybag-subscription");
        containerProps.setAckMode(AckMode.MANUAL);
        ConcurrentMessageListenerContainer<String, String> messageListContainer = new ConcurrentMessageListenerContainer<String, String>(
                factory, containerProps);
        BatchAcknowledgingConsumerAwareMessageListener<String, String> listener = new BatchConsumer();
        CountDownLatch latch = new CountDownLatch(1);
        // messageListContainer.setupMessageListener(listener);
        messageListContainer.setupMessageListener(new BatchAcknowledgingConsumerAwareMessageListener<String, String>() {

            @Override
            public void onMessage(List<ConsumerRecord<String, String>> data, Acknowledgment acknowledgment,
                    Consumer<?, ?> consumer) {
                System.out.println("*******Data : " + data.get(0).value());
                listener.onMessage(data, acknowledgment, consumer);
                latch.countDown();
            }
        });
        messageListContainer.start();
        ContainerTestUtils.waitForAssignment(messageListContainer,
                embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        template.send("topic1", "Hello");
        latch.await(10000, TimeUnit.MILLISECONDS);
        assertThat(latch.getCount()).isEqualTo(0);
    }

    @After
    public void destroy() {
        embeddedKafka.getEmbeddedKafka().destroy();
    }
...