Я пытаюсь отразить топи c из исходного кластера Kafka в целевой кластер Kafka, используя эту открытую библиотеку COMCAST . Мой источник topi c имеет полезную нагрузку json вместе со схемой, и я пытаюсь отразить этот topi c для конечного кафки. Также я конвертирую его в avro во время записи в топи c. Следовательно, я использую Kafka Avro Converter avro converter .
source kafka topi c содержит следующую полезную нагрузку вместе со схемой
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "string",
"optional": true,
"field": "email"
},
{
"type": "string",
"optional": true,
"field": "department"
},
{
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "updated_at"
}
],
"optional": false,
"name": "test"
},
"payload": {
"id": 7,
"name": "harry",
"email": "harry@gmail.com",
"department": "sales",
"updated_at": 1584976391000
}
}
ниже мой коннектор kafka curl для
curl -X POST -H "Content-Type: application/json" --data '{
"name": "kafka-connect-kafka-source-example-source-3",
"config": {
"tasks.max": "2",
"connector.class": "com.comcast.kafka.connect.kafka.KafkaSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"source.bootstrap.servers": "localhost:9092",
"source.topic.whitelist": "topic_test",
"topic.preserve.partitions": "true",
"source.auto.offset.reset": "earliest",
"source.group.id": "kafka-connect-testing-3",
"connector.consumer.reconnect.backoff.max.ms": "10000",
"key.converter.schema.registry.url": "http://localhost:8081/",
"value.converter.schema.registry.url": "http://localhost:8081/",
}
}' http://localhost:8083/connectors
problem- Он отражает topi c, но не конвертирует полезную нагрузку в формат avro. И зарегистрированная схема выглядит следующим образом.
{"subject":"topic_test-value","version":1,"id":1,"schema":"[\"null\",\"bytes\"]"}