Я пытаюсь создать соединитель Kafka-connect для передачи из темы AVRO в файл.
И затем восстановите этот файл в другой теме, используя kafka-connect.
Раковина работает нормально, я вижу, как растёт файл и читает данные.Но когда я пытаюсь восстановить новую тему, новая тема остается без данных ..
И я не получаю ошибок, я уже сбрасываю смещение, я создаю новый kafka-connect и пытаюсь восстановить,Я создаю полностью новый кластер Kafka и всегда один и тот же, без ошибок на исходном соединителе, но тема пуста.
Вот выходные данные конфигурации исходного соединителя:
{
"name": "restored-exchange-rate-log",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"value.converter.schema.registry.url": "http://kafka-schema:8881",
"file": "/tmp/exchange-rate-log.sink.txt",
"format.include.keys": "true",
"source.auto.offset.reset": "earliest",
"tasks.max": "1",
"value.converter.schemas.enable": "true",
"name": "restored-exchange-rate-log",
"topic": "restored-exchange-rate-log",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"tasks": [
{
"connector": "restored-exchange-rate-log",
"task": 0
}
],
"type": "source"
}
Ивот выход состояния коннектора источника:
{
"name": "restored-exchange-rate-log",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8883"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "kafka-connect:8883"
}
],
"type": "source"
}
вот конфиг выхода коннектора стока:
{
"name": "bkp-exchange-rate-log",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"source.auto.offset.reset": "earliest",
"tasks.max": "1",
"topics": "exchange-rate-log",
"value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
"value.converter.schema.registry.url": "http://kafka-schema:8881",
"file": "/tmp/exchange-rate-log.sink.txt",
"format.include.keys": "true",
"value.converter.schemas.enable": "true",
"name": "bkp-exchange-rate-log",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"tasks": [
{
"connector": "bkp-exchange-rate-log",
"task": 0
}
],
"type": "sink"
}
вот выход состояния коннектора стока:
{
"name": "bkp-exchange-rate-log",
"connector": {
"state": "RUNNING",
"worker_id": "kafka-connect:8883"
},
"tasks": [
{
"state": "RUNNING",
"id": 0,
"worker_id": "kafka-connect:8883"
}
],
"type": "sink"
}
Файл приемника работает, постоянно растет, но тема восстановленного-обменного курса-журнала совершенно пуста.
Добавление дополнительных сведений.
Я пробовал сейчасчтобы сделать «Заландо», но мы не используем s3, мы используем коннектор FileStream.
Здесь Раковина:
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"file": "/tmp/exchange-rate-log.bin",
"format.include.keys": "true",
"tasks.max": "1",
"topics": "exchange-rate-log",
"format": "binary",
"value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"name": "bkp-exchange-rate-log"
}
Здесь Источник:
{
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"file": "/tmp/exchange-rate-log.bin",
"format.include.keys": "true",
"tasks.max": "1",
"format": "binary",
"topic": "bin-test-exchange-rate-log",
"value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
"name": "restore-exchange-rate-log"
}
Соединитель приемника выглядит нормально, приемник сгенерировал этот файл / tmp / exchange-rate-log.bin и увеличивается, но источник (восстановление) получает ошибку:
Caused by: org.apache.kafka.connect.errors.DataException: bin-test-exchange-rate-log error: Not a byte array! [B@761db301
at com.spredfast.kafka.connect.s3.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:22)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:269)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more