Есть ли способ сделать опрос Kafka с использованием Spring-Kafka - который возвращает список новых сообщений? - PullRequest
0 голосов
/ 04 октября 2018

Я хотел бы знать, есть ли какая-либо опция в spring-kafka, которая будет захватывать все новые сообщения в список.

Например, если я слушаю объект Message, я хочу получить List<Message> с момента последнего опроса.Что-то вроде:

@KafkaListener(poll-interval=1000, topics = "${kafka.topic}", containerFactory = "objectListListenerContainerFactory", )
public void messageListener(List<Message> messages) {
    log.info("Count of new messages since last poll : {}", messages.size());
}

Я уже прошел Spring Kafka: Опрос новых сообщений вместо того, чтобы получать уведомление с помощью `onMessage` .Но было не очень полезно для меня.

1 Ответ

0 голосов
/ 04 октября 2018

С помощью Spring Spring Kafka вы можете использовать ConsumerFactory для создания KafkaConsumer, и тогда вы сможете опросить записи там, где вам будет удобно.

Также обратите внимание, чтоKafkaMessageListenerContainer может быть приостановлено в произвольный момент, чтобы прекратить опрос, но все равно быть подключенным к группе потребителей.

В расширении Spring Integration Kafka есть новый KafkaMessageSource для этого видаЗадачи:

/**
 * Polled message source for kafka. Only one thread can poll for data (or
 * acknowledge a message) at a time.
 * <p>
 * NOTE: If the application acknowledges messages out of order, the acks
 * will be deferred until all messages prior to the offset are ack'd.
 * If multiple records are retrieved and an earlier offset is requeued, records
 * from the subsequent offsets will be redelivered - even if they were
 * processed successfully. Applications should therefore implement
 * idempotency.
 *
 * @param <K> the key type.
 * @param <V> the value type.
 *
 * @author Gary Russell
 * @author Mark Norkin
 * @author Artem Bilan
 *
 * @since 3.0.1
 *
 */
public class KafkaMessageSource<K, V> extends AbstractMessageSource<Object>
implements DisposableBean, Lifecycle {

Я думаю, нам необходимо рассмотреть это в справочном руководстве Spring for Apache Kafka: https://docs.spring.io/spring-kafka/docs/current/reference/html/_spring_integration.html#si-kafka. Итак, не стесняйтесь поднимать вопрос в проекте spring-kafkaвосполнить этот пробел.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...