Поддержка коннектора kafka-connect-hdfs для сохранения массива байтов и разделения полей с использованием схемы FlatBuffer - PullRequest
0 голосов
/ 17 февраля 2019

Я искал поддержку соединителя kafka-connect-hdfs (Confluent) для сохранения массива байтов и разделения полей с использованием схемы FlatBuffer.

Я получаю данные в байтовом массиве от kafka.Этот байтовый массив генерируется из FlatBuffer.Необходимо сохранить его в HDFS по пути, скажем, Field1 / Field2 / Field3.Все эти поля должны быть извлечены из байтового массива с использованием схемы FlatBuffer.Кроме того, данные для сохранения в HDFS должны быть только в байтах.Для данных преобразование не требуется.

Я проверил оба:

  1. FieldPartitioner: https://github.com/confluentinc/kafka-connect-storage-common/blob/master/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldPartitioner.java
  2. Поддерживаемые форматы: Json, Avro, Parquet.В https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonRecordWriterProvider.java, я обнаружил, что байт-массив сохранен в HDFS, если данные имеют тип Kafka Struct.

Я не смог найти способ использовать их для своих целей.

Кто-нибудь знает о такой встроенной поддержке.Если нет, то, пожалуйста, направьте меня к ресурсу (если есть), чтобы создать пользовательскую поддержку для обоих.

1 Ответ

0 голосов
/ 22 февраля 2019

FlatBuffers (в настоящее время) не поддерживается форматом сериализации, и ByteArrayFormat доступен только для S3 Connect, но не для HDFS, и просто выдает формат ByteArraySerializer из Kafka (который будет объектом Struct после конвертера, да.

Что касается разделения, поскольку данные представляют собой только байты, они не проверяют значения записей для поддержки разделителей, поэтому вам также необходимо добавить пользовательские значения, которые потребуют десериализации.сообщения для проверки полей.

Я не уверен, почему вы связались с кодом подключения S3, но если вы хотите добавить свой собственный формат, посмотрите на PR, который добавил StringFormat к HDFS подключите


Чтобы построить проект, посмотрите FAQ

...