Потребительский юнит-тест с poll () никогда ничего не получает - PullRequest
0 голосов
/ 30 сентября 2018

Рассмотрим следующий код:

@Test(singleThreaded = true)
public class KafkaConsumerTest
{
  private KafkaTemplate<String, byte[]> template;
  private DefaultKafkaConsumerFactory<String, byte[]> consumerFactory;
  private static final KafkaEmbedded EMBEDDED_KAFKA;
  static {
      EMBEDDED_KAFKA = new KafkaEmbedded(1, true, "topic");
      try { EMBEDDED_KAFKA.before(); } catch (final Exception e) { e.printStackTrace(); }
    }

  @BeforeMethod
  public void setUp() throws Exception {
    final Map<String, Object> senderProps = KafkaTestUtils.senderProps(EMBEDDED_KAFKA.getBrokersAsString());
    senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    final ProducerFactory<String, byte[]> pf = new DefaultKafkaProducerFactory<>(senderProps);
    this.template = new KafkaTemplate<>(pf);
    this.template.setDefaultTopic("topic");
    final Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sender", "false", EMBEDDED_KAFKA);
    this.consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
    this.consumerFactory.setValueDeserializer(new ByteArrayDeserializer());
    this.consumerFactory.setKeyDeserializer(new StringDeserializer());
  }

  @Test
  public void testSendToKafka() throws InterruptedException, ExecutionException, TimeoutException {
    final String message = "42";
    final Message<byte[]> msg = MessageBuilder.withPayload(message.getBytes(StandardCharsets.UTF_8)).setHeader(KafkaHeaders.TOPIC, "topic").build();
    this.template.send(msg).get(10, TimeUnit.SECONDS);
    final Consumer<String, byte[]> consumer = this.consumerFactory.createConsumer();
    consumer.subscribe(Collections.singleton("topic"));
    final ConsumerRecords<String, byte[]> records = consumer.poll(10000);
    Assert.assertTrue(records.count() > 0);
    Assert.assertEquals(new String(records.iterator().next().value(), StandardCharsets.UTF_8), message);
    consumer.commitSync();
  }
}

Я пытаюсь отправить сообщение на KafkaTemplate и прочитать его снова, используя Consumer.poll().Я использую тестовый фреймворк TestNG .

Отправка работает, я проверил, используя "обычный" код, который я нашел в сети (зарегистрируйте прослушиватель сообщений на KafkaMessageListenerContainer).

Только я никогда не получаю ничего от потребителя.Я пробовал ту же последовательность (создать Consumer, poll()) против "реальной" установки Kafka, и она работает.

Значит, похоже, что-то не так с тем, как я настроил ConsumerFactory?Любая помощь будет принята с благодарностью!

1 Ответ

0 голосов
/ 01 октября 2018

Вам нужно использовать

EMBEDDED_KAFKA.consumeFromAnEmbeddedTopic(consumer, "topic");

перед публикацией записей через KafkaTemplate.

А затем в конце теста для проверки вам нужно использовать что-то вроде этого:

ConsumerRecord<String, String> record = KafkaTestUtils.getSingleRecord(consumer,  "topic");

Вы также можете использовать его так, как вы делаете, только то, что вам не хватает, это ConsumerConfig.AUTO_OFFSET_RESET_CONFIG как earliest, потому что по умолчанию это latest.Таким образом, потребитель, добавленный в тему позже, не увидит ранее опубликованных записей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...