зарегистрировать схему от Json при использовании кафки AvroConverter - PullRequest
1 голос
/ 25 марта 2020

Я пытаюсь отразить топи c из исходного кластера Kafka в целевой кластер Kafka, используя эту открытую библиотеку COMCAST . Мой источник topi c имеет полезную нагрузку json вместе со схемой, и я пытаюсь отразить этот topi c для конечного кафки. Также я конвертирую его в avro во время записи в топи c. Следовательно, я использую Kafka Avro Converter avro converter .

source kafka topi c содержит следующую полезную нагрузку вместе со схемой

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "name"
      },
      {
        "type": "string",
        "optional": true,
        "field": "email"
      },
      {
        "type": "string",
        "optional": true,
        "field": "department"
      },
      {
        "type": "int64",
        "optional": false,
        "name": "org.apache.kafka.connect.data.Timestamp",
        "version": 1,
        "field": "updated_at"
      }
    ],
    "optional": false,
    "name": "test"
  },
  "payload": {
    "id": 7,
    "name": "harry",
    "email": "harry@gmail.com",
    "department": "sales",
    "updated_at": 1584976391000
  }
}

ниже мой коннектор kafka curl для

curl -X POST -H "Content-Type: application/json" --data '{
  "name": "kafka-connect-kafka-source-example-source-3",
  "config": {
    "tasks.max": "2",
    "connector.class": "com.comcast.kafka.connect.kafka.KafkaSourceConnector", 
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "source.bootstrap.servers": "localhost:9092",
    "source.topic.whitelist": "topic_test",
    "topic.preserve.partitions": "true",
    "source.auto.offset.reset": "earliest",
    "source.group.id": "kafka-connect-testing-3",
    "connector.consumer.reconnect.backoff.max.ms": "10000",
    "key.converter.schema.registry.url": "http://localhost:8081/",
    "value.converter.schema.registry.url": "http://localhost:8081/",
  }
}' http://localhost:8083/connectors

problem- Он отражает topi c, но не конвертирует полезную нагрузку в формат avro. И зарегистрированная схема выглядит следующим образом.

{"subject":"topic_test-value","version":1,"id":1,"schema":"[\"null\",\"bytes\"]"}

1 Ответ

1 голос
/ 25 марта 2020

Глядя на документацию по разъему, я думаю, что вам, вероятно, следует установить source.value.deserializer на org.apache.kafka.connect.json.JsonConverter и предположить, что он соответствует стандартному набору Kafka Connect source.value.deserializer.schemas.enable на true.

Вы Возможно, вы захотите взглянуть на MirrorMaker 2, который теперь является частью Apache Kafka и предоставляет аналогичные функции репликации на основе Kafka Connect.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...