Kafka Consumer / Producer тестирование весной Kafka - PullRequest
0 голосов
/ 08 декабря 2018

Я сейчас работаю над модулем Kafka, где я использую spring-kafka абстракцию связи Kafka.Я могу интегрировать производителя и потребителя с точки зрения реальной реализации, однако я не уверен, как тестировать (в частности, интеграционный тест) бизнес-логику окружает у потребителя @KafkaListener.Я пытался следить за spring-kafk документацией и различными блогами по этой теме, но ни один из них не ответил на мой заданный вопрос.

Тестовый класс Spring Boot

//imports not mentioned due to brevity

@RunWith(SpringRunner.class)
@SpringBootTest(classes = PaymentAccountUpdaterApplication.class,
                webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class CardUpdaterMessagingIntegrationTest {

    private final static String cardUpdateTopic = "TP.PRF.CARDEVENTS";

    @Autowired
    private ObjectMapper objectMapper;

    @ClassRule
    public static KafkaEmbedded kafkaEmbedded =
            new KafkaEmbedded(1, false, cardUpdateTopic);

    @Test
    public void sampleTest() throws Exception {
        Map<String, Object> consumerConfig =
                KafkaTestUtils.consumerProps("test", "false", kafkaEmbedded);
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        ConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerConfig);
        ContainerProperties containerProperties = new ContainerProperties(cardUpdateTopic);
        containerProperties.setMessageListener(new SafeStringJsonMessageConverter());
        KafkaMessageListenerContainer<String, String>
                container = new KafkaMessageListenerContainer<>(cf, containerProperties);

        BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener((MessageListener<String, String>) data -> {
            System.out.println("Added to Queue: "+ data);
            records.add(data);
        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container, kafkaEmbedded.getPartitionsPerTopic());


        Map<String, Object> producerConfig = KafkaTestUtils.senderProps(kafkaEmbedded.getBrokersAsString());
        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        ProducerFactory<String, Object> pf =
                new DefaultKafkaProducerFactory<>(producerConfig);
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(pf);

        String payload = objectMapper.writeValueAsString(accountWrapper());
        kafkaTemplate.send(cardUpdateTopic, 0, payload);
        ConsumerRecord<String, String> received = records.poll(10, TimeUnit.SECONDS);

        assertThat(received).has(partition(0));
    }


    @After
    public void after() {
        kafkaEmbedded.after();
    }

    private AccountWrapper accountWrapper() {
        return AccountWrapper.builder()
                .eventSource("PROFILE")
                .eventName("INITIAL_LOAD_CARD")
                .eventTime(LocalDateTime.now().toString())
                .eventID("8730c547-02bd-45c0-857b-d90f859e886c")
                .details(AccountDetail.builder()
                        .customerId("idArZ_K2IgE86DcPhv-uZw")
                        .vaultId("912A60928AD04F69F3877D5B422327EE")
                        .expiryDate("122019")
                        .build())
                .build();
    }
}

Класс слушателя

@Service
public class ConsumerMessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerMessageListener.class);

    private ConsumerMessageProcessorService consumerMessageProcessorService;

    public ConsumerMessageListener(ConsumerMessageProcessorService consumerMessageProcessorService) {
        this.consumerMessageProcessorService = consumerMessageProcessorService;
    }


    @KafkaListener(id = "cardUpdateEventListener",
            topics = "${kafka.consumer.cardupdates.topic}",
            containerFactory = "kafkaJsonListenerContainerFactory")
    public void processIncomingMessage(Payload<AccountWrapper,Object> payloadContainer,
                                       Acknowledgment acknowledgment,
                                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
                                       @Header(KafkaHeaders.OFFSET) String offset) {

        try {
            // business logic to process the message
            consumerMessageProcessorService.processIncomingMessage(payloadContainer);
        } catch (Exception e) {
            LOGGER.error("Unhandled exception in card event message consumer. Discarding offset commit." +
                    "message:: {}, details:: {}", e.getMessage(), messageMetadataInfo);
            throw e;
        }
        acknowledgment.acknowledge();
    }
}

Мой вопрос: в тестовом классе я утверждаю раздел, полезную нагрузку и т. Д., Который опрашивает с BlockingQueue, однако мой вопрос заключается в том, как проверить, что мойбизнес-логика в классе, помеченном @KafkaListener, выполняется правильно и перенаправляет сообщения в разные темы на основе обработки ошибок и других бизнес-сценариев.В некоторых примерах я видел CountDownLatch, чтобы утверждать, что я не хочу вставлять в свою бизнес-логику, чтобы утверждать в коде производственного уровня.Также обработчик сообщений Async, так что, как утверждать выполнение, не уверен.

Любая помощь, приветствуется.

1 Ответ

0 голосов
/ 08 декабря 2018

выполняется правильно и направляет сообщения в другую тему на основе обработки ошибок и других бизнес-сценариев.

Из этой «другой» темы может потребоваться интеграционный тест, чтобы утверждать, чтослушатель обработал его, как и ожидалось.

Вы также можете добавить BeanPostProcessor в ваш тестовый пример и обернуть bean-компонент ConsumerMessageListener в прокси, чтобы убедиться, что входные аргументы соответствуют ожидаемым.

РЕДАКТИРОВАТЬ

Вот пример обертывания слушателя в прокси ...

@SpringBootApplication
public class So53678801Application {

    public static void main(String[] args) {
        SpringApplication.run(So53678801Application.class, args);
    }

    @Bean
    public MessageConverter converter() {
        return new StringJsonMessageConverter();
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        public Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

@Component
class Listener {

    @KafkaListener(id = "so53678801", topics = "so53678801")
    public void processIncomingMessage(Foo payload,
            Acknowledgment acknowledgment,
            @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
            @Header(KafkaHeaders.OFFSET) String offset) {

        System.out.println(payload);
        // ...
        acknowledgment.acknowledge();
    }

}

и

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=manual

и

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { So53678801Application.class,
        So53678801ApplicationTests.TestConfig.class})
public class So53678801ApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule embededKafka = new EmbeddedKafkaRule(1, false, "so53678801");

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.kafka.bootstrap-servers",
                embededKafka.getEmbeddedKafka().getBrokersAsString());
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    @Autowired
    private ListenerWrapper wrapper;

    @Test
    public void test() throws Exception {
        this.template.send("so53678801", "{\"bar\":\"baz\"}");
        assertThat(this.wrapper.latch.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(this.wrapper.argsReceived[0]).isInstanceOf(Foo.class);
        assertThat(((Foo) this.wrapper.argsReceived[0]).getBar()).isEqualTo("baz");
        assertThat(this.wrapper.ackCalled).isTrue();
    }

    @Configuration
    public static class TestConfig {

        @Bean
        public static ListenerWrapper bpp() { // BPPs have to be static
            return new ListenerWrapper();
        }

    }

    public static class ListenerWrapper implements BeanPostProcessor, Ordered {

        private final CountDownLatch latch = new CountDownLatch(1);

        private Object[] argsReceived;

        private boolean ackCalled;

        @Override
        public int getOrder() {
            return Ordered.HIGHEST_PRECEDENCE;
        }

        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            if (bean instanceof Listener) {
                ProxyFactory pf = new ProxyFactory(bean);
                pf.setProxyTargetClass(true); // unless the listener is on an interface
                pf.addAdvice(interceptor());
                return pf.getProxy();
            }
            return bean;
        }

        private MethodInterceptor interceptor() {
            return invocation -> {
                if (invocation.getMethod().getName().equals("processIncomingMessage")) {
                    Object[] args = invocation.getArguments();
                    this.argsReceived = Arrays.copyOf(args, args.length);
                    Acknowledgment ack = (Acknowledgment) args[1];
                    args[1] = (Acknowledgment) () -> {
                        this.ackCalled = true;
                        ack.acknowledge();
                    };
                    try {
                        return invocation.proceed();
                    }
                    finally {
                        this.latch.countDown();
                    }
                }
                else {
                    return invocation.proceed();
                }
            };
        }

    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...