Мы используем spring-kafka-test-2.2.8-RELEASE. Когда я использую шаблон для отправки сообщения, он правильно запускает прослушиватель, но я не могу получить содержимое сообщения в consumer.poll. Если я создаю экземпляр KafkaTemplate без «связывания» его в атрибуте класса и создаю его экземпляр на основе фабрики производителя, он отправляет сообщение, но не запускает @KafkaListener, работает, только если я устанавливаю прослушиватель сообщений в методе @Test. Мне нужно запустить слушатель kafka и понять, какие Topi c будут вызваны следующими (topi c "sucess" при выполнении без ошибок и errorTopi c, слушатель вызывает исключение) и содержимое сообщения.
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = { "tp-in-gco-mao-notasfiscais" })
public class InvoicingServiceTest {
@Autowired
private NFKafkaListener nfKafkaListener;
@ClassRule
public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1, false, "tp-in-gco-mao-
notasfiscais");
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Autowired
private KafkaTemplate<Object, Object> template;
@BeforeClass
public static void setup() {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY,
"spring.kafka.bootstrap-servers");
}
@Test
public void testTemplate() throws Exception {
NFServiceTest nfServiceTest = spy(new NFServiceTest());
nfKafkaListener.setNfServiceClient(nfServiceTest);
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("teste9", "false", broker.getEmbeddedKafka());
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, InvoiceDeserializer.class);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
DefaultKafkaConsumerFactory<Integer, Object> cf = new DefaultKafkaConsumerFactory<Integer, Object>(
consumerProps);
Consumer<Integer, Object> consumer = cf.createConsumer();
broker.getEmbeddedKafka().consumeFromAnEmbeddedTopic(consumer, "tp-in-gco-mao-notasfiscais");
ZfifNfMao zf = new ZfifNfMao();
zf.setItItensnf(new Zfietb011());
Zfietb011 zfietb011 = new Zfietb011();
Zfie011 zfie011 = new Zfie011();
zfie011.setMatkl("TESTE");
zfietb011.getItem().add(zfie011);
zf.setItItensnf(zfietb011);
template.send("tp-in-gco-mao-notasfiscais", zf);
List<ConsumerRecord<Integer, Object>> received = new ArrayList<>();
int n = 0;
while (received.size() < 1 && n++ < 10) {
ConsumerRecords<Integer, Object> records1 = consumer.poll(Duration.ofSeconds(10));
//records1 is always empty
if (!records1.isEmpty()) {
records1.forEach(rec -> received.add(rec));
}
}
assertThat(received).extracting(rec -> {
ZfifNfMao zfifNfMaoRdesponse = (ZfifNfMao) rec.value();
return zfifNfMaoRdesponse.getItItensnf().getItem().get(0).getMatkl();
}).contains("TESTE");
broker.getEmbeddedKafka().getKafkaServers().forEach(b -> b.shutdown());
broker.getEmbeddedKafka().getKafkaServers().forEach(b -> b.awaitShutdown());
consumer.close();
}
public static class NFServiceTest implements INFServiceClient {
CountDownLatch latch = new CountDownLatch(1);
@Override
public ZfifNfMaoResponse enviarSap(ZfifNfMao zfifNfMao) {
ZfifNfMaoResponse zfifNfMaoResponse = new ZfifNfMaoResponse();
zfifNfMaoResponse.setItItensnf(new Zfietb011());
Zfietb011 zfietb011 = new Zfietb011();
Zfie011 zfie011 = new Zfie011();
zfie011.setMatkl("TESTE");
zfietb011.getItem().add(zfie011);
zfifNfMaoResponse.setItItensnf(zfietb011);
return zfifNfMaoResponse;
}
}
}