kafka connect - преобразование ExtractTopic с коннектором приемника hdfs создает исключение NullPointerException - PullRequest
0 голосов
/ 25 января 2019

Я использую сливной hdfs-коннектор 5.0.0 с kafka 2.0.0, и мне нужно использовать преобразование ExtractTopic (https://docs.confluent.io/current/connect/transforms/extracttopic.html). Мой коннектор работает нормально, но когда я добавляю это преобразование, я получаю исключение NullPointerException, даже для простых данных выборка только с 2 атрибутами.

ERROR Task hive-table-test-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.NullPointerException
    at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:352)
    at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:464)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748) 

Вот конфигурация разъема:

name=hive-table-test
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=hive_table_test

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
value.converter.schema.registry.url=${env.SCHEMA_REGISTRY_URL}
schema.compatibility=BACKWARD

# HDFS configuration
# Use store.url instead of hdfs.url (deprecated) in later versions. Property store.url does not work, yet
hdfs.url=${env.HDFS_URL}
hadoop.conf.dir=/etc/hadoop/conf
hadoop.home=/opt/cloudera/parcels/CDH/lib/hadoop
topics.dir=${env.HDFS_TOPICS_DIR}

# Connector configuration
format.class=io.confluent.connect.hdfs.avro.AvroFormat
flush.size=100
rotate.interval.ms=60000

# Hive integration
hive.integration=true
hive.metastore.uris=${env.HIVE_METASTORE_URIS}
hive.conf.dir=/etc/hive/conf
hive.home=/opt/cloudera/parcels/CDH/lib/hive
hive.database=kafka_connect

# Transformations
transforms=InsertMetadata, ExtractTopic
transforms.InsertMetadata.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertMetadata.partition.field=partition
transforms.InsertMetadata.offset.field=offset

transforms.ExtractTopic.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ExtractTopic.field=name
transforms.ExtractTopic.skip.missing.or.null=true

Я использую реестр схемы, данные в формате avro, и я уверен, что данный атрибут name не равен нулю. Какие-либо предложения? Что мне нужно, это в основном извлечь содержимое данного поля и использовать его в качестве названия темы.

EDIT:

Это происходит даже на простом json, как это в формате avro:

{
   "attr": "tmp",
   "name": "topic1"
}

1 Ответ

0 голосов
/ 26 января 2019

Краткий ответ потому, что вы изменили название темы в вашей трансформации.

Hdfs Connector для каждого раздела темы имеет отдельный TopicPartitionWriter. Когда SinkTask, отвечающий за обработку сообщений, создается в методе open(...) для каждого раздела TopicPartitionWriter.

Когда он обрабатывает SinkRecords, основываясь на имя темы и номер раздела , он ищет TopicPartitionWriter и пытается добавить запись в свой буфер. В вашем случае он не мог найти запись для сообщения. Преобразование изменило имя темы, и для этой пары (тема, раздел) не было создано ни одного TopicPartitionWriter.

SinkRecords, которые передаются в HdfsSinkTask::put(Collection<SinkRecord> records), уже имеют разделы и тему, поэтому вам не нужно применять какие-либо преобразования.

Я думаю io.confluent.connect.transforms.ExtractTopic лучше использовать для SourceConnector.

...