Я создаю собственный SinkConnector с использованием Kafka Connect (2.3.0), который необходимо оптимизировать для пропускной способности, а не для задержки. В идеале, я хочу, чтобы:
Пакеты по ~ 20 мегабайт или 100 тыс. Записей независимо от того, что произойдет раньше, но если скорость передачи сообщений низкая, обрабатывайте по крайней мере каждую минуту (избегайте небольших пакетов, но минимальная скорость MySinkTask.put ()чтобы быть каждую минуту).
Это то, что я установил для пользовательских настроек в попытке достичь этого:
- consumer.max.poll.records = 100000
- consumer.fetch.max.bytes = 20971520
- consumer.fetch.max.wait.ms = 60000
- consumer.max.poll.interval.ms = 120000
consumer.fetch.min.bytes = 1048576
Мне нужен этот параметр fetch.min.bytes, иначе MySinkTask.put () вызывается несколько раз в секунду, несмотря на другие настройки. ..?
Теперь, что я наблюдаю в ситуации низкой скорости, это то, что MySinkTask.put () вызывается с 0 записями несколько раз и проходит несколько минут, покаfetch.min.bytes достигнут, и затем я получаю их все сразу.
Я до сих пор не понимаю:
- Почему fetch.max.wait.ms = 60000 не переходит от потребителя к вызову put () моего соединителя? Разве это не должно иметь приоритет над fetch.min.bytes?
- Какой параметр контролирует ~ 2x в секунду вызов MySinkTask.put (), если fetch.min.bytes = 1 (по умолчанию)? Я не понимаю, почему это происходит, даже подробный вывод настроек времени выполнения Connect не показывает интервал ниже кратных секунд.
Я дважды проверил вывод журнала истроки INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:
, напечатанные в среде выполнения Connect, показывают ожидаемые значения при передаче с префиксом consumer.
.