Kafka Connect и fetch.max.wait.ms & fetch.min.bytes не соблюдаются? - PullRequest
0 голосов
/ 01 октября 2019

Я создаю собственный SinkConnector с использованием Kafka Connect (2.3.0), который необходимо оптимизировать для пропускной способности, а не для задержки. В идеале, я хочу, чтобы:

Пакеты по ~ 20 мегабайт или 100 тыс. Записей независимо от того, что произойдет раньше, но если скорость передачи сообщений низкая, обрабатывайте по крайней мере каждую минуту (избегайте небольших пакетов, но минимальная скорость MySinkTask.put ()чтобы быть каждую минуту).

Это то, что я установил для пользовательских настроек в попытке достичь этого:

  • consumer.max.poll.records = 100000
  • consumer.fetch.max.bytes = 20971520
  • consumer.fetch.max.wait.ms = 60000
  • consumer.max.poll.interval.ms = 120000
  • consumer.fetch.min.bytes = 1048576

    Мне нужен этот параметр fetch.min.bytes, иначе MySinkTask.put () вызывается несколько раз в секунду, несмотря на другие настройки. ..?

Теперь, что я наблюдаю в ситуации низкой скорости, это то, что MySinkTask.put () вызывается с 0 записями несколько раз и проходит несколько минут, покаfetch.min.bytes достигнут, и затем я получаю их все сразу.

Я до сих пор не понимаю:

  • Почему fetch.max.wait.ms = 60000 не переходит от потребителя к вызову put () моего соединителя? Разве это не должно иметь приоритет над fetch.min.bytes?
  • Какой параметр контролирует ~ 2x в секунду вызов MySinkTask.put (), если fetch.min.bytes = 1 (по умолчанию)? Я не понимаю, почему это происходит, даже подробный вывод настроек времени выполнения Connect не показывает интервал ниже кратных секунд.

Я дважды проверил вывод журнала истроки INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:, напечатанные в среде выполнения Connect, показывают ожидаемые значения при передаче с префиксом consumer..

1 Ответ

0 голосов
/ 01 октября 2019

Кажется невозможным выполнение процесса «по крайней мере, каждый интервал », поскольку параметр потребителя fetch.min.bytes имеет преимущество, а Connect не позволяет динамически настраивать ConsumerConfig во время выполнения Задачи. : - (

В настоящее время обходным путем является пакетное задание в Задаче вручную; установите fetch.min.bytes в 1 (yikes), буферизуйте записи в Задаче при вызовах put () и очищайте при необходимости. Это неочень идеальный вариант, поскольку он подразумевает некоторые накладные расходы для коннектора, которых я надеялся избежать.

Логика того, как Connect выполняет пакетную обработку ~ 2 раза в секунду из опроса своего потребителя в SinkTask.put (), остается для меня загадкой, ноэто лучше, чем вызываться для каждого сообщения.

...