Я хочу прочитать сообщения из топи c, отправить их в другой топи c и подтвердить полученное сообщение после отправки в другие топи c. Следуя такому примеру, мой код работает, но когда я запускаю тестовый пример, он также запускается
Но он не отображается в покрытии кода.
Java
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Service
@Getter
@Slf4j
public class KafkaConsumerListener {
@Autowired
@Qualifier("stringTemplate")
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${topics.producerTopic}")
private String producerTopic;
@Value("${message.key}")
private String key;
@Value("${message.value}")
private String value;
@KafkaListener(topics = "${topics.consumerTopic}", groupId = "${topics.consumerGroupId}")
public void consume(@Payload String message, Acknowledgment acknowledgment) {
LogUtil.debug(log, "consume", "Entering: consume in KafkaConsumerListener");
if (StringUtils.isEmpty(message)) {
LogUtil.info(log, "getStringAsMap", null,"Empty or null message received");
} else if (message.startsWith("Value{")) {
HashMap<String, String> map = getHashMap(message);
sendSyncMessage(map, acknowledgment);
}
}
private void sendSyncMessage(HashMap<String, String> hashMap, Acknowledgment acknowledgment) {
try {
kafkaTemplate.send(producerTopic, hashMap.get(key), hashMap.get(value)).get(10, TimeUnit.SECONDS);
acknowledgment.acknowledge();
LogUtil.info(log, "sendSyncMessage", hashMap.get(key),
"Message sent Success:" + hashMap.get(key));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LogUtil.error(log, "sendSyncMessage", hashMap.get(logId),
"Producer failed to send message : " + hashMap.get(logId), e.getLocalizedMessage());
}
}
public HashMap<String, String> getHashMap( String message) {
// Some Logic
return hashMap;
}
}
Тестирует класс для прослушивателей для чтения сообщений от Kafka
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasKey;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;
@SpringBootTest
public class KafkaTemplateTests {
private static final String TEMPLATE_TOPIC = "test-topic";
private static final String KEY = "KEY";
private static final String MESSAGE = "MESSAGE";
private static final String EVENT_ID = "EVENT_ID";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka.getEmbeddedKafka());
DefaultKafkaConsumerFactory<String, String> cf =
new DefaultKafkaConsumerFactory<String, String>(consumerProps);
cf.setKeyDeserializer(new StringDeserializer());
cf.setValueDeserializer(new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<String, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
//ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<String, String>(producerProps);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(producerProps);
pf.setKeySerializer(new StringSerializer());
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.send(TEMPLATE_TOPIC, "DummyValue", "");
ConsumerRecord<String, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey("DummyValue"));
assertThat(received, hasValue(""));
HashMap<String, String> hashMap = getHashMap();
template.send(TEMPLATE_TOPIC, hashMap.get(KEY), hashMap.get(MESSAGE));
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(hashMap.get(KEY)));
assertThat(received, hasValue(hashMap.get(MESSAGE)));
}
}