Я хотел бы использовать соединитель приемника s3 kafka connect для потоковой передачи данных нашей темы в корзину s3. Данные внутри темы будут сообщениями XML. В соответствии с конфигурацией соединителя, мы можем определить формат сообщения (например, JsonFormat)
В соответствии со смежными документами, похоже, что мы можем определить пользовательский формат путем реализации
io.confluent.connect.storage.format.Format.
Я просматривал доступный код формата, такой как JsonFormat, похоже, что настоящая логика формата находится в JsonRecordWriterProvider, который является реализацией io.confluent.connect.storage.format.RecordWriterProvider
Я вижу, что реализация записи RecordWriter, которая применяет метод преобразования JsonConvertor надSinkRecord.value ().
Как мы можем узнать, что содержит SinkRecord, и можем ли мы просто написать xmlconvertor и преобразовать SinkRecord.value () в объект dom и т. Д.?
Любые ссылки, которыеМогли бы вы использовать его для реализации?
Я просматривал код, предоставленный confluent.
https://github.com/confluentinc/kafka-connect-storage-cloud/tree/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format