Как решить проблему в "schema.registry.url" при подключении поиска elasti c с Kafka Connect? - PullRequest
0 голосов
/ 21 января 2020

Я пытаюсь подключить Kafka Connect с помощью elasti c search. Я не использую в слияния, но в автономном режиме. Это моя конфигурация соединителя эластичного поиска.

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=mysql-jdbc-mall
key.ignore=true
schema.ignore=true
connection.url=http://172.**.*.**:5601
type.name=kafka-connect
elastic.security.protocol=SSL
key.converter.schemas.enable=false
value.converter.schemas.enable=false

Мои connect-standalone.properties:

bootstrap.servers=Ni****ing:9092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java

И когда я запускаю соединитель, я получаю проблему как.

[2020-01-21 09:31:03,676] ERROR Failed to start task elasticsearch-sink-0 (org.apache.kafka.connect.runtime.Worker:464)
io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
        at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
        at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
        at io.confluent.connect.avro.AvroConverterConfig.<init>(AvroConverterConfig.java:27)
        at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:58)
        at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:268)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:440)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:311)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:336)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:214)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
[2020-01-21 09:31:03,677] INFO Created connector elasticsearch-sink (org.apache.kafka.connect.cli.ConnectStandalone:112)

Обновление

Поскольку у меня нет файла / etc / schema-registry. Я изменил свой connect-standalone.properties на

bootstrap.servers=Nifi-Staging:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java

, но когда я использую JSONConverter, я получаю эту ошибку.

[2020-01-21 16:12:04,939] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
java.lang.NullPointerException
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:231)
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:142)
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:133)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-01-21 16:12:04,946] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-01-21 16:12:04,946] INFO Stopping ElasticsearchSinkTask (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:190)

1 Ответ

1 голос
/ 21 января 2020

io.confluent.connect.avro.AvroConverter - это требуется для определения schema.registry.url

Удалите оба реквизита schemas.enable, поскольку они применяются только к JSON; Avro всегда имеет схему , затем вместо этого добавьте URL-адреса

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://...
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://...

Пример файла свойств Connect можно найти в папке etc/schema-registry

Если вы не Используя Avro, измените конвертер, чтобы он соответствовал вашим данным. Ключ и значение тоже могут быть совершенно разных типов


Кроме того, urlasticsearch должен быть другим; например, что-то, работающее на порте 9200, а не на кибане на 5601


Я не использую в слиянии, но в автономном режиме.

Я предполагаю, что вы имеете в виду confluent команда? Это просто запускает kafka-connect-distributed для вас, и распределенный режим на самом деле предпочтительнее.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...