Как получить адрес загрузочного сервера в Kafka Connector, которым сейчас пользуется My Kafka Connect? - PullRequest
0 голосов
/ 20 ноября 2018

Я разрабатываю разъем для раковины Kafka самостоятельно.Мой десериализатор - JSONConverter.Однако, когда кто-то отправляет неверные данные JSON в тему моего коннектора, я хочу пропустить эту запись и отправить эту запись в конкретную тему моей компании.

Я путаюсь: я не могу найти API длямне получить bootstrap.servers моего Connect. (Я знаю, что он находится в каталоге и т.д. конфлуента, но не стоит писать жесткий код каталога «connect-distributed.properties» для получения bootstrap.servers)

Итак, вопрос, есть ли другой способ для меня, чтобы удобно получить значение bootstrap.servers в моей программе коннектора?

1 Ответ

0 голосов
/ 20 ноября 2018

Вместо того чтобы пытаться отправлять «плохие» записи из SinkTask в Kafka, вы должны вместо этого попытаться использовать функцию очереди недоставленных сообщений, которая была добавлена ​​в Kafka Connect 2.0.

Вы можете настроить среду выполнения Connectдля автоматического вывода записей, которые не удалось обработать, в настроенную тему, действующую как DLQ.

Подробнее см. KIP , в котором добавлена ​​эта функция.

...