kafka-connect: ошибка в коннекторе раковины кассандры - PullRequest
1 голос
/ 10 мая 2019

Я получаю сообщение об ошибке выполнения для коннектора приемника кассандры, я пытаюсь выбрать форму данных kafka и сохранить ее в кассандре. Вы найдете стек ошибок ниже:

{
  "name": "cassandraSinkConnector2",
  "connector": {
    "state": "RUNNING",
    "worker_id": "localhost:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "localhost:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: Key must be a struct or map. This connector requires that records from Kafka contain the keys for the Cassandra table. Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.\n\tat io.confluent.connect.cassandra.CassandraSinkTask.put(CassandraSinkTask.java:94)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)\n\t... 10 more\n"
    }
  ],
  "type": "sink"
}

Я использовал приведенную ниже распределенную конфигурацию длямой разъем:


{
  "name": "cassandraSinkConnector2",
  "config": {
    "connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
    "tasks.max": "1",
    "topics": "appartenance_de",
    "cassandra.contact.points": "localhost",
    "cassandra.kcql": "INSERT INTO app_test SELECT * FROM app_de",
    "cassandra.port": "9042",
    "cassandra.keyspace": "dev_dkks",
    "cassandra.username": "superuser",
    "cassandra.password": "superuser",
    "cassandra.write.mode": "upsert",
    "value.converter.schemas.enable": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "transforms": "createKey,extractInt",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id",
    "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractInt.field": "id",
    "name": "cassandraSinkConnector2"
  },
  "tasks": [
    {
      "connector": "cassandraSinkConnector2",
      "task": 0
    }
  ],
  "type": "sink"

1 Ответ

1 голос
/ 10 мая 2019

За мой ответ здесь , ошибка, которую вы видите:

org.apache.kafka.connect.errors.DataException: 
Record with a null key was encountered.  This connector requires that records from Kafka contain the keys for the Cassandra table. 
Please use a transformation like org.apache.kafka.connect.transforms.ValueToKey to create a key with the proper fields.

Я бы предложил использовать Преобразование одного сообщения, как указано в ошибке, для правильного ввода ваших данных.Вы можете увидеть пример этого здесь и документацию для преобразования здесь .

...