kafka connect для раковины jdbc для сервера sql microsoft. это работает для нескольких ключей для record_value, и эта ошибка появляется для record_key - PullRequest
1 голос
/ 30 апреля 2019

Я использовал драйвер мойки jdbc от kafka connect. это позволяет создать таблицу с одним первичным ключом, когда я пытаюсь добавить 2 поля pk.key. это дает мне ошибку:

java.lang.NullPointerException
        at io.confluent.connect.jdbc.util.TableDefinitions.refresh(TableDefinitions.java:86)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:65)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:85)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

работал с первичным ключом

1 Ответ

0 голосов
/ 14 мая 2019
My kafka connect configuration    
bootstrap.servers=localhost:9092
    group.id=connect-cluster
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    avro.compatibility.level=none
    auto.register.schemas=true
    config.storage.topic=connect-configs
    offset.storage.topic=connect-offsets
    status.storage.topic=connect-statuses
    config.storage.replication.factor=1
    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.host.name=kafka01.xxxxxxxxx.com
    rest.port=8083
    plugin.path=xxx/kafka/confluent-5.2.1/share/java,xxxx/kafka/confluent-5.2.1/share/java
    producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
    consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
...