Данные раздела Kafka в файл паркета HDFS с использованием проблемы конфигурации соединителя приемника HDFS - PullRequest
0 голосов
/ 17 февраля 2019

Мне нужна помощь по теме kafka, которую я хотел бы поместить в HDFS в формате паркет (с ежедневным разделителем).

У меня много данных в теме kafka, которые в основном представляют собой данные json, подобные этой:

{"title":"Die Hard","year":1988,"cast":["Bruce Willis","Alan Rickman","Bonnie Bedelia","William Atherton","Paul Gleason","Reginald VelJohnson","Alexander Godunov"],"genres":["Action"]}
{"title":"Toy Story","year":1995,"cast":["Tim Allen","Tom Hanks","(voices)"],"genres":["Animated"]}
{"title":"Jurassic Park","year":1993,"cast":["Sam Neill","Laura Dern","Jeff Goldblum","Richard Attenborough"],"genres":["Adventure"]}
{"title":"The Lord of the Rings: The Fellowship of the Ring","year":2001,"cast":["Elijah Wood","Ian McKellen","Liv Tyler","Sean Astin","Viggo Mortensen","Orlando Bloom","Sean Bean","Hugo Weaving","Ian Holm"],"genres":["Fantasy »]}
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}

Название этой темы: test

И я хотел бы поместить эти данные в мой кластер HDFS в формате паркета.Но я борюсь с конфигурацией разъема мойки.Для этого я использую сливной коннектор hdfs-раковина.

Вот что мне удалось сделать до сих пор:

{
  "name": "hdfs-sink",
  "config": {
    "name": "hdfs-sink",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test",
    "hdfs.url": "hdfs://hdfs-IP:8020",
    "hadoop.home": "/user/test-user/TEST",
    "flush.size": "3",
    "locale": "fr-fr",
    "timezone": "UTC",
    "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
    "consumer.auto.offset.reset": "earliest",
    "value.converter":  "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true"

  }
}

Некоторое объяснение того, почему я настроил разъем таким образом:

  • У меня много таких данных, которые ежедневно заполняют мою тему
  • Конечная цель - иметь один файл паркета в день в моей HDFS для этой темы

Я понял, что, возможно, мне нужно использовать системный реестр для форматирования данных в паркет, но я не знаю, как это сделать.И нужно ли это?

Не могли бы вы мне помочь с этим?

Спасибо

1 Ответ

0 голосов
/ 18 февраля 2019

Я лично не использовал ParquetFormat, но либо

  1. Должно быть из данных Avro (из-за собственного проекта Parquet-Avro).И так, вместо этого необходимо установить AvroConverter и добавить свойство value.converter.schema.registry.url, которое требует, чтобы вы запустили и установили реестр Confluent Schema, да.
  2. Вы должны использовать специальный формат JSON Kafka Connect, который включает схему в ваши записи .Это не может быть "простой JSON".Т.е. у вас сейчас "value.converter.schemas.enable": "true", и я предполагаю, что ваш коннектор не работает, потому что ваши записи не в указанном выше формате.

По сути, без схемы анализатор JSON не может знать, какие "столбцы" нужно написать Паркету.


И Daily Partitioner не создает один файл в день, только каталог.Вы получите один файл на flush.size, а также есть конфигурация для запланированных интервалов поворота файлов очистки.Кроме того, будет один файл на раздел Kafka.


Кроме того, "consumer.auto.offset.reset": "earliest", работает только в файле connect-distribtued.properties, а не на основе для каждого соединителя, AFAIK.


Поскольку я лично не использовал ParquetFormat, это все, что я могу дать, но я использовал другие инструменты, такие как NiFi , для аналогичных целей, которые позволятВы не должны изменять свой существующий код производителя Kafka.


В качестве альтернативы используйте вместо этого JSONFormat, однако интеграция Hive не будет работать автоматически, и таблицы должны быть предварительно определены (что в любом случае потребует наличия схемы для вашей темы).


И еще один вариант - просто настроить Hive для чтения непосредственно из Kafka

...