Kafka-connect FileStreamSourceConnector не создает темы - PullRequest
0 голосов
/ 08 февраля 2019

Я пытаюсь создать соединитель Kafka-connect для передачи из темы AVRO в файл.

И затем восстановите этот файл в другой теме, используя kafka-connect.

Раковина работает нормально, я вижу, как растёт файл и читает данные.Но когда я пытаюсь восстановить новую тему, новая тема остается без данных ..

И я не получаю ошибок, я уже сбрасываю смещение, я создаю новый kafka-connect и пытаюсь восстановить,Я создаю полностью новый кластер Kafka и всегда один и тот же, без ошибок на исходном соединителе, но тема пуста.

Вот выходные данные конфигурации исходного соединителя:

{
  "name": "restored-exchange-rate-log",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "value.converter.schema.registry.url": "http://kafka-schema:8881",
    "file": "/tmp/exchange-rate-log.sink.txt",
    "format.include.keys": "true",
    "source.auto.offset.reset": "earliest",
    "tasks.max": "1",
    "value.converter.schemas.enable": "true",
    "name": "restored-exchange-rate-log",
    "topic": "restored-exchange-rate-log",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  },
  "tasks": [
    {
      "connector": "restored-exchange-rate-log",
      "task": 0
    }
  ],
  "type": "source"
}

Ивот выход состояния коннектора источника:

{
  "name": "restored-exchange-rate-log",
  "connector": {
    "state": "RUNNING",
    "worker_id": "kafka-connect:8883"
  },
  "tasks": [
    {
      "state": "RUNNING",
      "id": 0,
      "worker_id": "kafka-connect:8883"
    }
  ],
  "type": "source"
}

вот конфиг выхода коннектора стока:

{
    "name": "bkp-exchange-rate-log",
    "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "source.auto.offset.reset": "earliest",
    "tasks.max": "1",
    "topics": "exchange-rate-log",
    "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.RecordNameStrategy",
    "value.converter.schema.registry.url": "http://kafka-schema:8881",
    "file": "/tmp/exchange-rate-log.sink.txt",
    "format.include.keys": "true",
    "value.converter.schemas.enable": "true",
    "name": "bkp-exchange-rate-log",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    },
    "tasks": [
    {
        "connector": "bkp-exchange-rate-log",
        "task": 0
    }
    ],
    "type": "sink"
}

вот выход состояния коннектора стока:

{
    "name": "bkp-exchange-rate-log",
    "connector": {
    "state": "RUNNING",
    "worker_id": "kafka-connect:8883"
    },
    "tasks": [
    {
        "state": "RUNNING",
        "id": 0,
        "worker_id": "kafka-connect:8883"
    }
    ],
    "type": "sink"
}

Файл приемника работает, постоянно растет, но тема восстановленного-обменного курса-журнала совершенно пуста.


Добавление дополнительных сведений.

Я пробовал сейчасчтобы сделать «Заландо», но мы не используем s3, мы используем коннектор FileStream.

Здесь Раковина:

{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
  "file": "/tmp/exchange-rate-log.bin",
  "format.include.keys": "true",
  "tasks.max": "1",
  "topics": "exchange-rate-log",
  "format": "binary",
  "value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "name": "bkp-exchange-rate-log"
}

Здесь Источник:

{
  "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
  "file": "/tmp/exchange-rate-log.bin",
  "format.include.keys": "true",
  "tasks.max": "1",
  "format": "binary",
  "topic": "bin-test-exchange-rate-log",
  "value.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "key.converter": "com.spredfast.kafka.connect.s3.AlreadyBytesConverter",
  "name": "restore-exchange-rate-log"
}

Соединитель приемника выглядит нормально, приемник сгенерировал этот файл / tmp / exchange-rate-log.bin и увеличивается, но источник (восстановление) получает ошибку:

Caused by: org.apache.kafka.connect.errors.DataException: bin-test-exchange-rate-log error: Not a byte array! [B@761db301
    at com.spredfast.kafka.connect.s3.AlreadyBytesConverter.fromConnectData(AlreadyBytesConverter.java:22)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:269)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 11 more

Ответы [ 2 ]

0 голосов
/ 19 февраля 2019

Мне удалось создать «Дамп» темы с помощью kafka-avro-console-consumer.Мы используем SSL + Реестр схем.

Вот командная строка, чтобы можно было создать дамп темы:

tpc=exchange-rate-log
SCHEMA_REGISTRY_OPTS="-Djavax.net.ssl.keyStore=. -Djavax.net.ssl.trustStore=. -Djavax.net.ssl.keyStorePassword=. -Djavax.net.ssl.trustStorePassword=." \
kafka-avro-console-consumer \
  --from-beginning --bootstrap-server $CONNECT_BOOTSTRAP_SERVERS \
  --property schema.registry.url=$CONNECT_SCHEMA_REGISTRY_URL \
  --topic $tpc --consumer-property security.protocol=SSL \
  --consumer-property ssl.truststore.location=/etc/ssl/kafkaproducer.truststore.jks \
  --consumer-property ssl.truststore.password=$MYPASS \
  --consumer-property ssl.keystore.location=/etc/ssl/kafkaproducer.keystore.jks \
  --consumer-property ssl.keystore.password=$MYPASS \
  --consumer-property ssl.key.password=$MYPASS \
  --property "key.separator=::-::" \
  --property "schema.id.separator=::_::" \
  --property print.schema.ids=true \
  --timeout-ms 15000 \
  --property "print.key=true" \
  --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer" > $tpc.dump

Но я не нашел способа импортироватьон вернулся с использованием kafka-avro-console -roduction, потому что он не работает с ключами не-avro.С помощью этого файла дампа я мог бы написать производителя Python, который читает этот файл и восстанавливает тему обратно.

0 голосов
/ 08 февраля 2019

Я не совсем уверен, что коннекторы Connect File - хороший вариант использования для этого.

Кроме того, конвертер Avro не будет выгружать файл в воспроизводимом формате.Это будет выглядеть как Struct{field=value}

Если вы действительно хотите сделать дамп в файл, просто наберите kafka-avro-console-consumer, включите ключ, передайте --key-deserializer в качестве строкового и запишите его, используя > file.txt

Чтобы восстановить, вы можете попытаться использовать производителя консоли Avro, но свойства строкового сериализатора нет, поэтому ключи необходимо заключать в кавычки, я думаю, они будут переданы в анализатор JSON

Вы можете проверить таким образом

echo '"1"|{"data":value"}'  > kafka-avro-console-producer...

(также необходимо установить свойство key.separator)

И создание файла будет выглядеть как

kafka-avro-console-producer...  < file.txt 

Чтобы это работало в случае, если весь Kafka Cluster исчезает, и у вас остается только этот файл, тогда вам также нужно будет сделать резервную копию вашей схемы Avro (поскольку Registry * 1023)* тема ушла)

...