Kafka Connect экспортирует несколько типов событий из одной темы - PullRequest
0 голосов
/ 23 мая 2018

Я пытаюсь использовать новую функцию (https://www.confluent.io/blog/put-several-event-types-kafka-topic/) в отношении хранения двух разных типов событий на одну и ту же тему. На самом деле я использую Confluent версии 4.1.0 и задаю эти свойства ниже, чтобы это произошло

properties.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY,TopicRecordNameStrategy.class.getName());
properties.put("value.multi.type", true);

Данные записываются в тему без проблем и могут рассматриваться в приложении Kafka Streams как общие записи Avro. Также в реестре схемы Kafka создаются две новые записи, по одной для каждого события, размещенного в этой конкретной теме.

Проблема, с которой я сталкиваюсь, заключается в том, что я не могу экспортировать эти данные из этой темы с помощью Kafka Connect. В простейшем случае, когда я использую File Sink Connector, как показано ниже

{
  "name": "sink-connector",
  "config": {
      "topics": "source-topic",
      "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
      "tasks.max": 1,
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schema.registry.url":"http://kafka-schema-registry:8081",
      "value.converter":"io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url":"http://kafka-schema-registry:8081",
      "value.subject.name.strategy":"io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
      "file": "/tmp/sink-file.txt"
    }
}

Я получаю ошибкуиз коннектора, который, по-видимому, является некоторой ошибкой сериализации на основе AvroConverter, как показано здесь

org.apache.kafka.connect.errors.DataException: source-topic
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 2
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:296)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:284)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:125)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:236)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:152)
    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:120)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:83)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Обратите внимание, что в реестре схемы есть схема Avro с идентификатором 2, а другая - с идентификатором схемы 3, описывающая дваСобытия, размещенные в той же теме. Те же проблемы возникают при использовании JDBC-коннектора.

So как мне справиться с этим делом, чтобы экспортировать данные из моего кластера Kafka во внешнюю систему.Я что-то упустил в моей конфигурации?Можно ли создать тему с несколькими типами событий и экспортировать их через Kafka Connect?

1 Ответ

0 голосов
/ 04 июня 2018

Нашел решение.Мой код передавал ключ как String и значение как avro.При чтении куст-улей попытался найти схему ключа avro и не смог ее найти.Добавление свойства key.converter = org.apache.kafka.connect.storage.StringConverter key.converter.schema.registry.url = http://localhost:8081 помогло решить проблему.

...