Отсутствует необходимая конфигурация "key.converter", которая не имеет значения по умолчанию - PullRequest
0 голосов
/ 01 мая 2018

Когда я пытаюсь запустить Kafka connect для эластичного поискового реактора, в автономном режиме я получаю следующую ошибку:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "key.converter" which has no default value.
        at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:463)
        at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:62)
        at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:75)
        at org.apache.kafka.connect.runtime.WorkerConfig.<init>(WorkerConfig.java:218)
        at org.apache.kafka.connect.runtime.distributed.DistributedConfig.<init>(DistributedConfig.java:272)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:72)

Могу ли я решить эту ошибку?

РЕДАКТИРОВАТЬ 01/05/2018 Извините, я стараюсь быть более конкретным. Использую разъем реактора потока: https://github.com/Landoop/stream-reactor Это команда, которую я запускаю из экземпляра EC2, в котором есть уникальный брокер моей кафки:

./bin/connect-standalone.sh config/elastic-config.properties config/connect- 
standalone.properties.

В порядке, это connect-standalone.properties:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some 
settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it 
into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when 
loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's 
setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# The internal converter used for offsets and config data is configurable 
and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never 
visible outside of Copcyat in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
plugin.path=/home/ubuntu/kafka_2.11-1.0.1/libs

А это другой файл:

name=elasticsearch-sink
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=test
    topic.index.map=test:test_index
    connection.url=myurl
    type.name=log
    key.ignore=true
    schema.ignore=true

1 Ответ

0 голосов
/ 01 мая 2018

Ошибка вроде говорит обо всем. Вам не хватает обязательной записи конфигурации для key.converter. Это сообщает Kafka Connect, как десериализовать данные по теме Kafka (обычно JSON или Avro).

Пример допустимой конфигурации соединителя для Elasticsearch можно посмотреть здесь, в this gist . Если вы обновите свой вопрос, включив в него конфигурацию, которую вы используете, я могу указать, как ее включить.


После просмотра вашей конфигурации причина вашей ошибки заключается в том, что вы вызываете Connect с вашими файлами конфигурации в неправильном порядке, и, следовательно, Connect не может найти ожидаемую конфигурацию.

Должно быть:

./bin/connect-standalone.sh config/connect-standalone.properties config/elastic-config.properties

Подробнее о потоковой передаче с Kafka на Elasticsearch можно прочитать в этой статье и этой серии статей об использовании Kafka Connect:

...