Kafka connect - строка не может быть приведена к структуре - PullRequest
0 голосов
/ 11 октября 2019

Я делаю POC из Confluent Kafka Connect версии 5.2.3. Мы пытаемся скопировать сообщение темы из файла в качестве резервной копии и из этого файла обратно в тему, когда нам это нужно.

В теме есть ключ = строка Значение = protbuf

Я использую

key.convertor=org.apache.kafka.connect.storgare.StringConvertor value.convertor=com.blueapron.connect.protobuf.ProtobufConvertor value.convertor.protoClassName=<proto class name>

Мойка конфигурации

name=test
connector.class=FileStreamSink
tasks.max=1
file=test.txt
topics=testtopic

Источникconfig

name=test
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topics=testtopic_connect

Я могу успешно преобразовать его в файл с содержимым файла, как показано ниже

Struct{<message in name value pair>}
Struct{<message in name value pair>}

....

Тот же файл, который я использую для источникавернемся к другой теме. Когда я запускаю источник, он выдает ошибку

Строка не может быть преобразована в org.apache.kafka.connect.data.Struct.

Вопросы

  • Почему яне вижу ни одного ключа в файле, когда моя тема кафки имеет пару ключ-значение.
  • Почему источник не может скопировать содержимое из файла в тему и выдает ошибку, связанную с приведением типа.
  • Я получаюподобная ошибка, когда я использую ByteArrayConvertor, предоставленный kafka. Строка не может быть преобразована в байты. В идеале ByteArrayConvertor должен работать с любыми данными.
  • Работает ли blueapron только с версией protobuf3?

1 Ответ

0 голосов
/ 15 октября 2019

Почему я не вижу ни одного ключа в файле, когда в моей теме кафки есть пара ключ-значение

FileSink не записывает ключ, только значение, выполняя .toString Структурных или неструктурных данных. Существует SMT, чтобы переместить ключ в значение - https://github.com/jcustenborder/kafka-connect-transform-archive

Но строки файла все равно будут выглядеть как Struct{key= ..., value=...}

Почему источник не можетскопировать содержимое из файла в тему и выдать ошибку, связанную с приведением.

Я получаю похожую ошибку, когда использую ByteArrayConvertor, предоставленный kafka. Строка не может быть преобразована в байты. В идеале ByteArrayConvertor должен работать с любыми типами данных.

FileSource считывает только разделенные строкой поля из файла как строки.

Работает ли blueapron только с версией protobuf3?

Кажется, что это так - https://github.com/blueapron/kafka-connect-protobuf-converter/blob/v2.2.1/pom.xml#L21


По сути, если вы хотите полностью создать резервную копию темы (включая метки времени, заголовки и, возможно, даже смещения), вы 'Потребуется что-то, что создает дамп и читает фактические двоичные данные (это не то, что делает ByteArrayConverter, только (de) сериализует данные из / в Kafka, а не из источника / приемника, и коннектор FileSource попытается всегда читать файл обратно как строку )

...