Пытаясь использовать буфер потока akka для улучшения пропускной способности моего потока, мне интересно, как это применимо к Kafka
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
, в частности,
val kafkaSource =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.buffer(10000, OverflowStrategy.backpressure)
Что именнопроизойдет здесь относительно базового API Kafka?
У меня есть следующая конфигурация на базовом клиенте Kafka:
.withProperty(AUTO_OFFSET_RESET_CONFIG, offsetReset)
.withProperty(MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMs.toString)
.withProperty(SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs.toString)
.withProperty(HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatIntervalMs.toString)
.withProperty(FETCH_MAX_WAIT_MS_CONFIG, fetchMaxWaitMs.toString)
.withProperty(MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
.withProperty(FETCH_MAX_BYTES_CONFIG, maxPollRecords.toString)
.withProperty(MAX_PARTITION_FETCH_BYTES_CONFIG, maxPollRecords.toString)
Следовательно, у меня есть MAX_POLL_RECORDS_CONFIG
, FETCH_MAX_BYTES_CONFIG
и MAX_PARTITION_FETCH_BYTES_CONFIG
Что мне интересноЭто то, как буфер будет играть по отношению к выборке, настроенной на базовом клиенте.
- Материализован ли
Consumer.committableSource
в его собственном Actor, и получать ли сообщение от базового клиента kafka через его буфер? Допустим, базовый клиент настроен на получение до миллиона сообщений, а Actor - в качестве буфера 1000
? Что это значит? Что происходит? Перезаписывает ли буфер Actor запрос опроса клиента kafka или он получает данные в своем почтовом ящике, который отправляет клиент Kafka, до тех пор, пока результат его опроса (максимум, настроенный в базовом клиенте) не будет пройден?
Если вообще кто-то может открыть окно объяснения того, как внутренний и / или явный буфер потока kafka взаимодействует с настройками запроса опроса, это действительно поможет.
Спасибо