Kafka-Connect HDFS - Протобуф для паркета - PullRequest
0 голосов
/ 17 февраля 2019

Я пытаюсь использовать kafka-connect-hdfs, но, похоже, это не работает ..

Я пытался возиться с настройками, но, похоже, ничего не работает ..

Это схема сообщения Protobuf:

syntax = "proto3";
package com.company;
option java_package = "com.company";
option java_outer_classname = "MyObjectData";
import public "wrappers.proto";
message MyObject {
int64 site_id = 1;
string time_zone = 2;
uint64 dev_id = 3;
uint64 rep_id = 4;
uint64 dev_sn = 5;
UInt64Value timestamp = 6;
UInt32Value secs = 7;
UInt64Value man_id = 8;
FloatValue panv = 9;
FloatValue outputv = 10;
FloatValue panelc = 11;
FloatValue ereset = 12;
FloatValue temp = 13;
FloatValue tempin = 14;
FloatValue tempout = 15;
UInt32Value sectelem = 16;
FloatValue energytelem = 17;
UInt32Value ecode = 18;

}

Соединение connect-standalone.properties выглядит следующим образом:

bootstrap.servers=k1:9092,k2:9092,k3:9092


key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=com.blueapron.connect.protobuf.ProtobufConverter
value.converter.protoClassName=com.company.MyObjectData$MyObject
key.converter.schemas.enable=false
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/usr/share/java

И quickstart-hdfs.properties:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=ObjectTopic
hadoop.conf.dir=/etc/hadoop
hdfs.url=hdfs://hdp-01:8020/user/hdfs/telems
hadoop.home=/etc/hadoop/client
flush.size=3
key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=com.blueapron.connect.protobuf.ProtobufConverter
value.converter.protoClassName=com.company.MyObjectData$MyObject

format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

transforms=SetSchemaName
transforms.SetSchemaName.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.SetSchemaName.schema.name=com.acme.avro.MyObject

В настоящее время я получаю следующую ошибку:

org.apache.kafka.connect.errors.ConnectException: Выход из WorkerSinkTask из-за неисправимого исключения.в org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages (WorkerSinkTask.java:586) в org.apache.kafka.connect.runtime.WorkerSinkTask.poll (WorkerSinkTask.java:32conkakakakakakakakakakakakakakakakakakakakakakakakaapka.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.ka.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.ka.ka.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.kaf.ka.ka.kaf.kaf.kaf.kaf.kaf.jp.runtime.WorkerSinkTask.iteration (WorkerSinkTask.java:225) в org.apache.kafka.connect.runtime.WorkerSinkTask.execute (WorkerSinkTask.java:193) в org.apache.kafka.connect.runask.java: 175) на org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:219) на java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) на java.util.concurrent.FutureTask.run (FutureTask.java:266) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) в java.util.concurrent.ThreadPoolExecutor $ Worker.run6 в java (Thread).lang.Thread.run (Thread.java:748) Причина: org.apache.avro.SchemaParseException: Невозможно переопределить: io.confluent.connect.avro.ConnectDefault на org.apache.avro.Schema $ Names.put(Schema.java:1128) в org.apache.avro.Schema $ NamedSchema.writeNameRef (Schema.java:562) в org.apache.avro.Schema $ RecordSchema.toJson (Schema.java:690) в org.apache.avro.Schema $ UnionSchema.toJson (схема).Java: 882) в org.apache.avro.Schema $ RecordSchema.fieldsToJson (Schema.java:716) в org.apache.avro.Schema $ RecordSchema.toJson (Schema.java:701) в org.apache.avro.Schema.toString (Schema.java:324) в org.apache.avro.Schema.toString (Schema.java:314) в org.apache.parquet.avro.AvroWriteSupport.init (AvroWriteSupport.java:133) в org.apache.parquet.hadoop.ParquetWriter. (ParquetWriter.java:270) в org.apache.parquet.hadoop.ParquetWriter. (ParquetWriter.java:222) в org.apache.parquet.hadoop.ParquetWriter. (ParquetWriter.java:8)org.apache.parquet.avro.AvroParquetWriter. (AvroParquetWriter.java:131) в org.apache.parquet.avro.AvroParquetWriter. (AvroParquetWriter.java:106) в io.confluent.connect.hriterParquetParts(ParquetRecordWriterProvider.java:75) по адресу io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord (TopicPartitionWriter.java:643) по адресу io.confluent.connect.hdfs.TopicPartitionWriter.write (TopicPartitionWriter.java:379) по адресу io.confluent.connect.hdfs.DataWriter.write (DataWriter.jrite ido3)..confluent.connect.hdfs.HdfsSinkTask.put (HdfsSinkTask.java:109) в org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages (WorkerSinkTask.java:564) * также 1017 *

1019это важно, я делаю это с помощью пользователя hdfs

Это проблема схемы?Кажется, что ничего, что я делаю, даже не меняет сообщение об ошибке ...

1 Ответ

0 голосов
/ 18 февраля 2019

Возможно, Can't redefine: io.confluent.connect.avro.ConnectDefault потому что ваше преобразование устанавливает свойство схемы.

Вы также можете попробовать использовать AvroFormat, который будет принимать внутренний объект схемы и структуры Connect и записывать файлы Avro в HDFS.

Обратите внимание, ParquetFormat использует проект parquet-avro , поэтому для начала, вероятно, следует использовать данные Avro.

Обратите внимание на Stacktrace.

org.apache.avro.SchemaParseException ...

...

org.apache.avro.Schema $ Имена.положить (Schema.java:1128) в org.apache.avro.Schema $ NamedSchema.writeNameRef (Schema.java:562) в org.apache.avro.Schema $ RecordSchema.toJson (Schema.java:690) в org.apache.avro.Schema $ UnionSchema.toJson (Schema.java:882) в org.apache.avro.Schema $ RecordSchema.fieldsToJson (Schema.java:716) в org.apache.avro.Schema $ RecordSchema.toJson (Schema.java): 701) в org.apache.avro.Schema.toString (Schema.java:324) в org.apache.avro.Schema.toString (Schema.java:314) в org.apache.parquet.avro.AvroWriteSupport.init (AvroWriteSupport.java:133) в org.apache.parquet.hadoop.ParquetWriter. (ParquetWriter.java:270) в org.apache.parquet.hadoop.ParquetWriter. (ParquetWriter.java:222) в org.apache.parquet.h.ParquetWriter. (ParquetWriter.java:188) в org.apache.parquet.avro.AvroParquetWriter. (AvroParquetWriter.java:131) в org.apache.parquet.avro.AvroParquetWriter. (AvroParquetWriter.java: 106)

Поэтому вам нужно где-то написать конвертер protofuf-avro.Возможно использование skeuomorph

  1. Kafka Streams или аналогичного процесса между вашим производителем и Connect (самый простой из этих вариантов)
  2. Изменение kafka-connect-hdfsпроект, чтобы Protobuf мог быть обработан
  3. Измените код ProtobufConverter так, чтобы он генерировал ConnectRecord данных Avro

Если ничего не помогло, вы могли бы файлвопрос об этом и посмотрим, что вы получите.

...