настройки sping kafka для fetch-min-size & max-poll-records не работают должным образом - PullRequest
0 голосов
/ 27 марта 2019

Я работаю над загрузочным приложением Spring с spring kafka, которое прослушивает отдельную тему kafka, а затем разделяет записи по соответствующим категориям, создает из него файл json и загружает его в AWS S3.

Я получаю огромные объемы данных в темах Kafka, и мне нужно убедиться, что файлы json разбиты на части соответственно, чтобы ограничить количество загружаемых json в S3.

Ниже приведена моя application.yml конфигурация для потребителя кафки.

spring:
  kafka:
    consumer:
      group-id: newton
      auto-offset-reset: earliest
      fetch-max-wait: 
        seconds: 1 
      fetch-min-size: 500000000
      max-poll-records: 50000000
      value-deserializer: com.forwarding.application.consumer.model.deserializer.MeasureDeserializer

Я создал слушателя для постоянного чтения темы.

Даже с указанной конфигурацией я получаю записи в консоли следующим образом:

   2019-03-27T15:25:56.02+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.024  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 56. No Of measures: 60
   2019-03-27T15:25:56.21+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.210  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 80. No Of measures: 96
   2019-03-27T15:25:56.56+0530 [APP/PROC/WEB/0] OUT 2019-03-27 09:55:56.560  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 76. No Of measures: 39
   2019-03-27T15:25:56.73+0530 [APP/PROC/WEB/2] OUT 2019-03-27 09:55:56.732  INFO 8 --- [ntainer#0-0-C-1] c.s.n.f.a.s.impl.ConsumerServiceImpl     : Time taken(ms) 77. No Of measures: 66

Может кто-нибудь сообщить мне, что можно настроить для получения полученных записей в соответствии с настройкой в ​​application.yml?

1 Ответ

2 голосов
/ 27 марта 2019

Я только что скопировал вашу конфигурацию (кроме максимального ожидания - посмотрите, какой синтаксис я использовал), и он работал нормально ...

spring:
  kafka:
    consumer:
      group-id: newton
      auto-offset-reset: earliest
      fetch-max-wait: 1s
      fetch-min-size: 500000000
      max-poll-records: 50000000
2019-03-27 13:43:55.454  INFO 98982 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 1000
    fetch.min.bytes = 500000000
    group.id = newton
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 50000000
    ...

Вы устанавливаете произвольные свойства, которые непосредственно не поддерживаются при загрузкесвойства, используя свойство ...properties.

например,

spring:
  kafka:
    consumer:
      properties:
        max.poll.interval.ms: 300000

или

spring:
  kafka:
    consumer:
      properties:
         max:
           poll:
             interval:
               ms: 300000

Документация находится здесь .

Свойства, поддерживаемыеАвтоматическая настройка показана в Приложении A, Общие свойства приложения.Обратите внимание, что по большей части эти свойства (дефис или camelCase) отображаются непосредственно в точечные свойства Apache Kafka.Подробнее см. В документации Apache Kafka.

Первые несколько из этих свойств применяются ко всем компонентам (производителям, потребителям, администраторам и потокам), но могут быть указаны на уровне компонентов, если вы хотите использовать другие значения,Apache Kafka обозначает свойства со значением HIGH, MEDIUM или LOW.Автоконфигурация Spring Boot поддерживает все свойства высокой важности, некоторые выбранные свойства MEDIUM и LOW и любые свойства, которые не имеют значения по умолчанию.

Только подмножество свойств, поддерживаемых Kafka, доступно непосредственно через KafkaPropertiesучебный класс.Если вы хотите настроить производителя или потребителя с дополнительными свойствами, которые не поддерживаются напрямую, используйте следующие свойства:

spring.kafka.properties.prop.one=first

spring.kafka.admin.properties.prop.two=second

spring.kafka.consumer.properties.prop.three=third

spring.kafka.producer.properties.prop.four=fourth

spring.kafka.streams.properties.prop.five=fifth

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...