Ошибка при использовании org.apache.kafka.connect.json.JsonConverter в Kafka Connect - PullRequest
0 голосов
/ 08 апреля 2019

Я пытаюсь использовать конвертер Json в Kafka connect, но выдает ошибку ниже:

{"type":"log", "host":"connecttest6-ckaf-connect-84866788d4-p8lkh", "level":"ERROR", "neid":"kafka-connect-4d9495b82e1e420992ec44c433d733ad", "system":"kafka-connect", "time":"2019-04-08T11:55:14.254Z", "timezone":"UTC", "log":"pool-5-thread-1 - org.apache.kafka.connect.runtime.WorkerTask - WorkerSinkTask{id=hive-sink6-1} Task threw an uncaught and unrecoverable exception"}
java.lang.ClassCastException: org.apache.kafka.connect.json.JsonConverter cannot be cast to io.confluent.connect.hdfs.Format
        at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:242)
        at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:103)
        at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:98)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:302)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:191)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Я попытался выполнить следующую настройку для Json (key.converter.enable.schema = false и value.converter.enable.schema = false) в исходном коде и то же в конфигурации HDFSSinkConnector.

Конфигурация подключения:

 ConnectKeyConverter: "org.apache.kafka.connect.json.JsonConverter"
  ConnectValueConverter: "org.apache.kafka.connect.json.JsonConverter"
  ConnectKeyConverterSchemasEnable: "true"
  ConnectValueConverterSchemasEnable: "true"
 "http(s)://schemareg_headless_service_name.namespace.svc.cluster.local:port"
  ConnectSchemaRegistryUrl: "http://kafka-schema-registry-ckaf-schema-registry-headless.ckaf.svc.cluster.local:8081"
  ConnectInternalKeyConverter: "org.apache.kafka.connect.json.JsonConverter"
  ConnectInternalValueConverter: "org.apache.kafka.connect.json.JsonConverter"

Команда API REST, используемая для добавления приемника (Конфигурация приемника):

curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector","tasks.max":"2","topics":"topic-test","hdfs.url": "hdfs://localhost/tmp/jsontest4","flush.size": "3","name": "thive-sink6","format.class":"org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","key.converter.schemas.enable":"false"}' connecttest6-ckaf-connect.ckaf.svc.cluster.local:8083/connectors/hive-sink6/config

После добавления раковины в Kafka Connect. Я отправил данные в соответствующую тему Kafka. Ниже приведены данные, которые я пробовал:

{"name":"test"}
{"schema":{"type":"struct","fields":[{"type":"string","field":"name"}]},"payload":{"name":"value1"}}

Ожидается, что данные будут записаны в папку HDFS, указанную в указанной выше конфигурации раковины.

Нужны предложения по описанному выше сценарию и способы устранения ошибки.

1 Ответ

0 голосов
/ 09 апреля 2019

Похоже, что где-то в ваших конфигурациях (для Kubernetes ??) вы назначили format.class=org.apache.kafka.connect.json.JsonConverter, что недопустимо.

Возможно, вы хотели использовать io.confluent.connect.hdfs.json.JsonFormat

...