org. apache .kafka.common.errors.TimeoutException: истекло время ожидания при извлечении метаданных topi c для кластера Kafka с использованием аутентификации конфигурации SASL jaas - PullRequest
1 голос
/ 24 апреля 2020

Я пытаюсь развернуть конвейер потока данных облака Google, который читает из кластера Kafka, обрабатывает его записи и затем записывает результаты в BigQuery. Однако при попытке развертывания я продолжаю сталкиваться со следующим исключением:

org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata for Kafka Cluster

Для кластера Kafka требуется использовать конфигурацию JAAS для аутентификации, и я использую приведенный ниже код для установки свойств, необходимых для KafkaIO. читать Apache Метод луча:

// Kafka properties
    Map<String, Object> kafkaProperties = new HashMap<String, Object>(){{
        put("request.timeout.ms", 900000);
        put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"USERNAME\" password=\"PASSWORD\";");
        put(CommonClientConfigs.GROUP_ID_CONFIG, GROUP_ID);
    }};

    // Build & execute pipeline
    pipeline
            .apply(
                    "ReadFromKafka",
                    KafkaIO.<Long, String>read()
                            .withBootstrapServers(properties.getProperty("kafka.servers"))
                            .withKeyDeserializer(LongDeserializer.class)
                            .withValueDeserializer(StringDeserializer.class)
                            .withTopic(properties.getProperty("kafka.topic")).withConsumerConfigUpdates(kafkaProperties))

Конвейер потока данных должен быть развернут с отключенными публичными c IP-адресами, но существует установленный VPN-туннель от нашей сети Google Cloud VP C до Кафки Кластер и требуемая маршрутизация для частных ips на обеих сторонах настроены, и их IP-адреса внесены в белый список. Я могу пропинговать и подключиться к сокету сервера Kafka, используя виртуальную машину Compute Engine в той же подсети VPN, что и развертываемое задание потока данных.

Я думал, что есть проблема с конфигурацией, но я не могу выяснить, если я пропускаю дополнительное поле или одно из существующих настроено неправильно. Кто-нибудь знает, как я могу диагностировать проблему дальше, так как выброшенное исключение действительно не определяет проблему? Любая помощь будет принята с благодарностью.

Редактировать: Теперь я могу успешно развернуть задание потока данных, однако кажется, что чтение не работает правильно. После просмотра журналов, чтобы проверить ошибки в задании потока данных, я вижу, что после обнаружения координатора группы для kafka topi c, нет никаких других операторов журнала до оператора журнала предупреждений о том, что закрытие простоя читателя тайм-аут:

Close timed out with 1 pending requests to coordinator, terminating client connections

, за которым следует неперехваченное исключение с root причиной:

org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition test-partition could be determined

При этом возникает ошибка:

Execution of work for P0 for key 0000000000000001 failed. Will retry locally.

Может ли это быть проблемой с определением ключа, так как темы Kafka фактически не имеют ключей в сообщениях? Когда я просматриваю топи c в Kafka Tool, единственные столбцы, наблюдаемые в данных, состоят из смещения, сообщения и метки времени.

1 Ответ

0 голосов
/ 06 мая 2020

Исходя из последнего комментария, я предполагаю, что вы испытываете больше проблем с сетевым стеком, чем изначально пытаетесь найти какие-либо недостатки конфигурации в конвейере Dataflow с точки зрения выполнения подключений исполнителей заданий Dataflow к брокерам Kafka.

В основном, когда вы используете Publi c IP пул адресов для работников Dataflow, у вас есть самый простой способ получить доступ к внешнему кластеру Kafka без дополнительной настройки, применимой с обеих сторон, так как вы не нужно запускать VP C сеть между сторонами и выполнять обычную сетевую работу для обеспечения работы всех маршрутов.

Однако Cloud VPN приносит еще больше сложностей реализация сети VP C с обеих сторон и дальнейшая настройка шлюза VPN, правил переадресации и пула адресации для этого VP C. Вместо этого, с точки зрения времени выполнения потока данных, вам не нужно распространять IP-адреса Publi c между участниками потока данных и, несомненно, снижать цену.

Проблема, которую вы упомянули в основном, лежит на стороне кластера Kafka. В связи с тем, что Apache Kafka является распределенной системой, она имеет основной принцип: когда производитель / потребитель выполняет, он запрашивает метаданные о том, какой брокер является лидером для раздела, получая метаданные с конечными точками доступны для этого раздела, поэтому клиент затем подтверждает эти конечные точки для подключения к конкретному брокеру. И, насколько я понимаю, в вашем случае соединение с ведущим осуществляется через слушатель , привязанный к внешнему сетевому интерфейсу, настроенный в server.properties broker с настройкой .

Следовательно, вы можете рассмотреть возможность создания отдельного прослушивателя (если он не существует) в listeners, привязанном к облачному сетевому интерфейсу VP C, и при необходимости распространять advertised.listeners с метаданными, которые возвращаются клиенту, состоящие из данные для подключения к конкретному брокеру.

...