Как получать сообщения одно за другим из темы кафки - PullRequest
0 голосов
/ 30 января 2019

Я создал одну тему кафки с одним разделом.

kafka-topics --create --topic files.write --if-not-exists --zookeeper zookeeper:32181 --partitions 1 --replication-factor 1

Можно было бы отправить много сообщений в этой теме.

Но я бы хотел, чтобы один потребитель (для данной группы) обрабатывал эти сообщения по одному.

spring:
  application:
    name: file-consumer
  cloud:
    stream:
      kafka:
        binder:
          type: kafka
          brokers: localhost
          defaultBrokerPort: 29092
          defaultZkPort: 32181
          configuration:
            max.request.size: 300000
            max.message.bytes: 300000
        bindings:
          fileWriteBindingInput:
            consumer:
              autoCommitOffset: false
      bindings:
        fileWriteBindingInput:
          binder: kafka
          destination: files.write
          group: ${spring.application.name}
          contentType: 'text/plain'

И пример кода Java

@StreamListener(FileBindingProcessor.INPUT_FILE_WRITE)
public void onInputMessage(Message<String> message, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {

    // I Would like here to synchronize the processing of messages one by one
    // But, If many messages are pushed to this topic (single partition), they will be processed asynchronously event if I didn't yet acknowledge the current message

    acknowledgment.acknowledge();
}

Чтоотсутствует в моей конфигурации?

Я думал, что пока сообщение не подтверждено (смещение не увеличено), ни одно другое сообщение не поступает из того же раздела.

Ответы [ 3 ]

0 голосов
/ 30 января 2019

Вы можете установить этот потребительский конфиг max.poll.records на 1, по умолчанию это 500

max.poll.records

Максимальное количество возвращаемых записейв одном вызове poll ().

0 голосов
/ 30 января 2019

Неподтверждение сообщения не имеет никакого отношения к прекращению доставки следующего сообщения.

Вы не можете передать сообщение другому потоку и подтвердить его позже;если вы хотите однопоточную обработку, вы должны выполнить всю обработку в потоке слушателя.

0 голосов
/ 30 января 2019

Если autoCommitOffset включено (это значение по умолчанию), то связыватель уже подтвердит каждую запись.Таким образом, к тому времени, как он достигает вашего StreamListener, запись уже подтверждена.

Исправление: вышеприведенное утверждение о StreamListener не совсем верно.Автоматическое подтверждение выполнено, когда слушатель выходит.

Поскольку у вас есть только один раздел, вы получите сообщения в том же порядке, в котором они были отправлены в этот раздел темы.Вы можете отключить autoCommitOffset, в этом случае вы можете использовать подтверждение вручную.

...