Как сделать так, чтобы сообщения доходили до кафки брокера? - PullRequest
0 голосов
/ 08 февраля 2019

У меня есть производитель сообщений на моем локальном компьютере и брокер на удаленном хосте (aws).

После отправки сообщения от производителя я жду и вызываю потребителя консоли на удаленном хосте и вижучрезмерные журналы.Без значения источника.

Производитель сбрасывает данные после вызова метода send.Все настроено правильно.

Как проверить, получил ли брокер сообщение от производителя, и узнать, получил ли производитель ответ?

Ответы [ 3 ]

0 голосов
/ 09 февраля 2019

Вы можете попробовать get () API отправки, который вернет Future of RecordMetadata

ProducerRecord<String, String> record = 
new ProducerRecord<>("SampleTopic", "SampleKey", "SampleValue");

try {
    producer.send(record).get(); 
} catch (Exception e) {
    e.printStackTrace(); 
}

0 голосов
/ 12 февраля 2019

Используйте доставку только один раз, и вам не нужно беспокоиться о том, достигнуто ли ваше сообщение: https://www.baeldung.com/kafka-exactly-once, https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/

0 голосов
/ 08 февраля 2019

Метод Send асинхронно отправляет сообщение в тему и возвращает Future из RecordMetadata.

java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)

Асинхронно отправляет запись вtopic

После вызова flush проверьте, чтобы завершить Future, вызвав метод isDone.(например, Future.isDone() == true)

Вызов этого метода делает сразу доступными для отправки все буферизованные записи (даже если linger.ms больше 0) и блокирует завершение запросов, связанных с этимизаписей.Постусловие flush () состоит в том, что любая ранее отправленная запись будет завершена (например, Future.isDone () == true).Запрос считается выполненным, когда он успешно подтвержден в соответствии с заданной вами конфигурацией acks, или же он приводит к ошибке.

* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *

public int partition ()

Раздел, на который была отправлена ​​запись

public long offset ()

смещение записи или -1, если {hasOffset ()} возвращает false.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...