Невозможно использовать эластичный соединитель раковины (kafka-connect) - PullRequest
0 голосов
/ 11 декабря 2019

В настоящее время я пытаюсь запустить соединитель приемника эластичного поиска в кластере kafka-connect (распределенный режим). Этот кластер развернут в kubernetes с использованием диаграмм руля, предоставленных слияниями с некоторыми изменениями в нем. Вот соответствующие части:

Для values.yaml

configurationOverrides:
  "plugin.path": "/usr/share/java,/usr/share/confluent-hub-components"
  "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  "key.converter.schemas.enable": "false"
  "value.converter.schemas.enable": "false"
  "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter"
  "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"
  "config.storage.replication.factor": "3"
  "offset.storage.replication.factor": "3"
  "status.storage.replication.factor": "3"
  "security.protocol": SASL_SSL
  "sasl.mechanism": SCRAM-SHA-256

А для части кластера kube:

releases:
  - name: kafka-connect
    tillerless: true
    tillerNamespace: qa3-search
    chart: ../charts/cp-kafka-connect
    namespace: qa3-search
    values:
      - replicaCount: 2
      - configurationOverrides:
          config.storage.topic: kafkaconnectKApp_connect-config_private_json
          offset.storage.topic: kafkaconnectKApp_connect-offsets_private_json
          status.storage.topic: kafkaconnectKApp_connect-statuses_private_json
          connect.producer.client_id: "connect-worker-producerID"
          groupId: "kafka-connect-group-ID"
          log4j.root.loglevel: "INFO"
          bootstrap_servers: "SASL_SSL://SOME_ACCESSIBLE_URL:9094"
          client.security.protocol: SASL_SSL
          client.sasl.mechanism: SCRAM-SHA-256
      - prometheus:
          jmx:
            enabled: false
      - ingress:
          enabled: true
          hosts:
            - host: kafka-connect.qa3.k8s.XXX.lan
              paths:
                - /
      - cp-schema-registry:
          url: "https://SOME_ACCESSIBLE_URL"

Затем загружаю разъем приемника эластичного поиска как таковой:

curl -X POST -H 'Content-Type: application/json' http://kafka-connect.qa3.k8s.XXX.lan/connectors -d '{
"name": "similarads3",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
"topics": "SOME_TOPIC_THAT_EXIST",
"topic.index.map": "SOME_TOPIC_THAT_EXIST:test_similar3",
"connection.url": "http://vqa38:9200",
"batch.size": 1,
"type.name": "similads",
"key.ignore": true,
"errors.log.enable": true,
"errors.log.include.messages": true,
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "SOME_ACCESSIBLE_URL",
"schema.ignore": true
}
}' -vvv

Более того, я загружаю имя пользователя и пароль для аутентификации брокеров через переменную окружения, и я почти уверен, что это связано с правами ACL ...

Что беспокоитмне, в том, что при запуске коннектора нет создания индекса, и в логах kafka-connect нет ошибок, что когда-либо происходило ... И это говорит о том, что все началось

Starting connectors and tasks using config offset 68

При запуске скручивания/ connectors / similarads3 / status, все работает, без ошибок.

Похоже, я что-то упустил, но не могу понять, чего не хватает. Когда я проверяю отставание потребителей по этим конкретным темам, мне кажется, что нигде нет сообщений, где они потребляются.

Если информации недостаточно, я могу предоставить больше. У кого-нибудь есть идея?

РЕДАКТИРОВАТЬ: я должен был упомянуть, что я пытался настроить его с темой, которая не существует: опять же нет ошибок в журналах. (Я не знаю, как это интерпретировать)

РЕДАКТИРОВАТЬ 2: Эта проблема решена На самом деле мы нашли проблему, и кажется, что я что-то упустил: чтобы прочитать изТемой, защищенной правами ACL, необходимо предоставить конфигурацию SASL как для соединителя, так и для потребителя приемника. Так что просто дублирование конфигурации с префиксом consumer. решило эту проблему. Однако я все еще удивлен, что никакие журналы не могут указать на это.

1 Ответ

1 голос
/ 11 декабря 2019

У нас были проблемы при попытке использовать свойство topic.index.map. Даже если вы работаете, в документах есть примечание, что оно устарело.

topic.index.map
This option is now deprecated. A future version may remove it completely. Please use single message transforms, such as RegexRouter, to map topic names to index names.

Я бы попробовал использовать RegexRouter , чтобы выполнить это вместо этого.

"transforms": "renameTopicToIndex",
"transforms.renameTopicToIndex.type": "org.apache.kafka.connect.transforms.RegexRouter"
"transforms.renameTopicToIndex.regex": ".*"
"transforms.renameTopicToIndex.replacement": "test_similar3"
...