Покрытие кода для Kafka Listener не работает в Springboot? - PullRequest
0 голосов
/ 03 мая 2020

Я хочу прочитать сообщения из топи c, отправить их в другой топи c и подтвердить полученное сообщение после отправки в другие топи c. Следуя такому примеру, мой код работает, но когда я запускаю тестовый пример, он также запускается
Но он не отображается в покрытии кода.

Java

import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;    
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Service
@Getter
@Slf4j
public class KafkaConsumerListener {

    @Autowired
    @Qualifier("stringTemplate")
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${topics.producerTopic}")
    private String producerTopic;

    @Value("${message.key}")
    private String key;

    @Value("${message.value}")
    private String value;

    @KafkaListener(topics = "${topics.consumerTopic}", groupId = "${topics.consumerGroupId}")
    public void consume(@Payload String message, Acknowledgment acknowledgment) {
        LogUtil.debug(log, "consume", "Entering:  consume in KafkaConsumerListener");
        if (StringUtils.isEmpty(message)) {
            LogUtil.info(log, "getStringAsMap", null,"Empty or null message received");
        } else if (message.startsWith("Value{")) {
            HashMap<String, String> map = getHashMap(message);
            sendSyncMessage(map, acknowledgment);
        }
    }

    private void sendSyncMessage(HashMap<String, String> hashMap, Acknowledgment acknowledgment) {
        try {
            kafkaTemplate.send(producerTopic, hashMap.get(key), hashMap.get(value)).get(10, TimeUnit.SECONDS);
            acknowledgment.acknowledge();
            LogUtil.info(log, "sendSyncMessage", hashMap.get(key),
                    "Message sent Success:" + hashMap.get(key));
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            LogUtil.error(log, "sendSyncMessage", hashMap.get(logId),
                    "Producer failed to send message : " + hashMap.get(logId), e.getLocalizedMessage());
        }
    }

     public HashMap<String, String> getHashMap( String message) {
        // Some Logic
        return hashMap;
}

}

Тестирует класс для прослушивателей для чтения сообщений от Kafka

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;

@SpringBootTest
public class KafkaTemplateTests {

    private static final String TEMPLATE_TOPIC = "test-topic";
    private static final String KEY = "KEY";
    private static final String MESSAGE = "MESSAGE";
    private static final String EVENT_ID = "EVENT_ID";

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);

    @Test
    public void testTemplate() throws Exception {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
                embeddedKafka.getEmbeddedKafka());
        DefaultKafkaConsumerFactory<String, String> cf =
                new DefaultKafkaConsumerFactory<String, String>(consumerProps);
        cf.setKeyDeserializer(new StringDeserializer());
        cf.setValueDeserializer(new StringDeserializer());
        ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
        KafkaMessageListenerContainer<String, String> container =
                new KafkaMessageListenerContainer<>(cf, containerProperties);
        final BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
        container.setupMessageListener(new MessageListener<String, String>() {

            @Override
            public void onMessage(ConsumerRecord<String, String> record) {
                System.out.println(record);
                records.add(record);
            }

        });
        container.setBeanName("templateTests");
        container.start();
        ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
        //ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<String, String>(producerProps);
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
        pf.setKeySerializer(new StringSerializer());
        KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
        template.setDefaultTopic(TEMPLATE_TOPIC);
        template.sendDefault("foo");
        assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));

        template.send(TEMPLATE_TOPIC, "DummyValue", "");
        ConsumerRecord<String, String> received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey("DummyValue"));
        assertThat(received, hasValue(""));

        HashMap<String, String> hashMap = getHashMap();
        template.send(TEMPLATE_TOPIC, hashMap.get(KEY), hashMap.get(MESSAGE));
        received = records.poll(10, TimeUnit.SECONDS);
        assertThat(received, hasKey(hashMap.get(KEY)));
        assertThat(received, hasValue(hashMap.get(MESSAGE)));

    }

}
...