Нет ожидающего ответа: ConsumerRecord - PullRequest
0 голосов
/ 13 декабря 2018

Я пытаюсь использовать ReplyingKafkaTemplate и периодически вижу сообщение ниже.

Нет ожидающего ответа: ConsumerRecord (topic = request-reply-topic, partition = 8, offset = 1,CreateTime = 1544653843269, размер сериализованного ключа = -1, размер сериализованного значения = 1609, заголовки = RecordHeaders (заголовки = [RecordHeader (ключ = kafka_correlationId, значение = [-14, 65, 21, -118, 70, -94, 72,87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), ключ = ноль, с id_корреляции: [-18271255759235816475365319231847350110], возможно, тайм-аут или использованиеобщая тема ответа

Это будет происходить из приведенного ниже кода

RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
  if (this.sharedReplyTopic) {
    if (this.logger.isDebugEnabled()) {
      this.logger.debug(missingCorrelationLogMessage(record, correlationId));
    }
  }
  else if (this.logger.isErrorEnabled()) {
    this.logger.error(missingCorrelationLogMessage(record, correlationId));
  }
}

Но это происходит только случайно

Я также установил для параметра replyTopic значение false, как показано нижеи попытался увеличить время ожидания

ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
        replyKafkaTemplate.setSharedReplyTopic(false);
        replyKafkaTemplate.setReplyTimeout(10000);
        return replyKafkaTemplate;

Мой Контейнер такой, как показано ниже

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    factory.setBatchListener(false);
    factory.getContainerProperties().setPollTimeout(1000);
    factory.getContainerProperties().setIdleEventInterval(10000L);
    factory.setConcurrency(3);
    factory.setReplyTemplate(kafkaTemplate());
    return factory;
}

1 Ответ

0 голосов
/ 13 декабря 2018

Если оно прерывистое, скорее всего, ответ пришел слишком долго.Сообщение кажется вполне ясным

возможно, истекло время ожидания или используется общая тема ответа

Каждый экземпляр на стороне клиента должен использовать свою собственную тему ответа или выделенный раздел.

РЕДАКТИРОВАТЬ

Вы получаете журнал, если получено сообщение с идентификатором корреляции, который не соответствует записям, находящимся в настоящее время в этом.futures (ожидающие ответы).Это может произойти только при следующих обстоятельствах:

  1. Время ожидания запроса истекло (в этом случае появится соответствующий журнал WARN).
  2. Шаблоном является stop () ped (в этом случае this.futures очищается).
  3. Уже обработанный ответ по какой-то причине доставлен (не должно происходить).
  4. Ответ получен до того, как ключ добавлен в this.futures (не может произойти, поскольку он вставлен до отправки () записи).
  5. На стороне сервера отправляется 2 или более ответов.для того же запроса.
  6. Некоторые другие приложения отправляют данные в ту же тему ответа.Если вы можете воспроизвести его с помощью журнала DEBUG, это поможет, потому что тогда мы также регистрируем ключ корреляции при отправке.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...