Определить пользовательский формат для коннектора раковины s3 kafka connect - PullRequest
0 голосов
/ 28 октября 2019

Я хотел бы использовать соединитель приемника s3 kafka connect для потоковой передачи данных нашей темы в корзину s3. Данные внутри темы будут сообщениями XML. В соответствии с конфигурацией соединителя, мы можем определить формат сообщения (например, JsonFormat)

В соответствии со смежными документами, похоже, что мы можем определить пользовательский формат путем реализации
io.confluent.connect.storage.format.Format.

Я просматривал доступный код формата, такой как JsonFormat, похоже, что настоящая логика формата находится в JsonRecordWriterProvider, который является реализацией io.confluent.connect.storage.format.RecordWriterProvider

Я вижу, что реализация записи RecordWriter, которая применяет метод преобразования JsonConvertor надSinkRecord.value ().

Как мы можем узнать, что содержит SinkRecord, и можем ли мы просто написать xmlconvertor и преобразовать SinkRecord.value () в объект dom и т. Д.?

Любые ссылки, которыеМогли бы вы использовать его для реализации?

Я просматривал код, предоставленный confluent.

https://github.com/confluentinc/kafka-connect-storage-cloud/tree/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format

...