Я пытаюсь развернуть конвейер потока данных облака Google, который читает из кластера Kafka, обрабатывает его записи и затем записывает результаты в BigQuery. Однако при попытке развертывания я продолжаю сталкиваться со следующим исключением:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata for Kafka Cluster
Для кластера Kafka требуется использовать конфигурацию JAAS для аутентификации, и я использую приведенный ниже код для установки свойств, необходимых для KafkaIO. читать Apache Метод луча:
// Kafka properties
Map<String, Object> kafkaProperties = new HashMap<String, Object>(){{
put("request.timeout.ms", 900000);
put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"USERNAME\" password=\"PASSWORD\";");
put(CommonClientConfigs.GROUP_ID_CONFIG, GROUP_ID);
}};
// Build & execute pipeline
pipeline
.apply(
"ReadFromKafka",
KafkaIO.<Long, String>read()
.withBootstrapServers(properties.getProperty("kafka.servers"))
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withTopic(properties.getProperty("kafka.topic")).withConsumerConfigUpdates(kafkaProperties))
Конвейер потока данных должен быть развернут с отключенными публичными c IP-адресами, но существует установленный VPN-туннель от нашей сети Google Cloud VP C до Кафки Кластер и требуемая маршрутизация для частных ips на обеих сторонах настроены, и их IP-адреса внесены в белый список. Я могу пропинговать и подключиться к сокету сервера Kafka, используя виртуальную машину Compute Engine в той же подсети VPN, что и развертываемое задание потока данных.
Я думал, что есть проблема с конфигурацией, но я не могу выяснить, если я пропускаю дополнительное поле или одно из существующих настроено неправильно. Кто-нибудь знает, как я могу диагностировать проблему дальше, так как выброшенное исключение действительно не определяет проблему? Любая помощь будет принята с благодарностью.
Редактировать: Теперь я могу успешно развернуть задание потока данных, однако кажется, что чтение не работает правильно. После просмотра журналов, чтобы проверить ошибки в задании потока данных, я вижу, что после обнаружения координатора группы для kafka topi c, нет никаких других операторов журнала до оператора журнала предупреждений о том, что закрытие простоя читателя тайм-аут:
Close timed out with 1 pending requests to coordinator, terminating client connections
, за которым следует неперехваченное исключение с root причиной:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition test-partition could be determined
При этом возникает ошибка:
Execution of work for P0 for key 0000000000000001 failed. Will retry locally.
Может ли это быть проблемой с определением ключа, так как темы Kafka фактически не имеют ключей в сообщениях? Когда я просматриваю топи c в Kafka Tool, единственные столбцы, наблюдаемые в данных, состоят из смещения, сообщения и метки времени.