Подключитесь к Kafka с помощью SSL с помощью KafkaIO в Google Dataflow - PullRequest
0 голосов
/ 24 января 2019

С сервера я смог подключиться и получить данные из раздела удаленного сервера kafka, для которого настроен SSL.

От GCP, Как я могу подключиться к удаленному серверу kafka, используя конвейер данных Google, передавая хранилище доверенных сертификатов SSL, расположения сертификатов хранилища ключей и учетную запись службы Google json?

Я использую плагин Eclipse для бегуна потока данныхoption.

Если я указываю на сертификат в GCS, выдается ошибка, когда сертификаты указывают на корзину хранения Google.


Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

Caused by: org.apache.kafka.common.KafkaException:
 java.io.FileNotFoundException: 
gs:/bucket/folder/truststore-client.jks (No such file or directory)

Подписано: Truststore и Google Cloud Dataflow

Обновлен код, указывающий хранилище доверенных сертификатов SSL, расположение хранилища ключей в каталоге / tmp локального компьютера в случае, если KafkaIO необходимо прочитать изПуть файла.Он не выдавал FileNotFoundError.

При попытке запустить серверный клиентский код Java из учетной записи GCP, а также с помощью потока данных - конвейер Java Beam, я получаю следующую ошибку.


ssl.truststore.location = <LOCAL MACHINE CERTICATE FILE PATH>
    ssl.truststore.password = [hidden]
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka version : 1.0.0
org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : aaa7af6d4a11b29d
org.apache.kafka.common.network.SslTransportLayer close
WARNING: Failed to send SSL Close message 
java.io.IOException: Broken pipe

org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81)
    at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:153)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:205)
    at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
    at 

org.apache.kafka.common.utils.LogContext$KafkaLogger warn
WARNING: [Consumer clientId=consumer-1, groupId=test-group] Connection to node -2 terminated during authentication. This may indicate that authentication failed due to invalid credentials.

Любые предложения или примеры приветствуются.

1 Ответ

0 голосов
/ 01 февраля 2019

Git clone или загрузка проекта Java Maven с локального компьютера в домашний каталог GCP Cloud Shell. Скомпилируйте проект с помощью команды Dataflow runner на терминале Cloud Shell.

mvn -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=com.packagename.JavaClass \
      -Dexec.args="--project=PROJECT_ID \
      --stagingLocation=gs://BUCKET/PATH/ \
      --tempLocation=gs://BUCKET/temp/ \
      --output=gs://BUCKET/PATH/output \
      --runner=DataflowRunner"

Убедитесь, что для runner задан DataflowRunnner.class, и вы видите задание в консоли потока данных при его запуске в облаке. Выполнения DirectRunner не будут отображаться на консоли потока данных в облаке.

Поместите сертификаты в папку ресурсов в проекте Maven и читайте файлы с помощью ClassLoader.

ClassLoader classLoader = getClass().getClassLoader();
File file = new File(classLoader.getResource("keystore.jks").getFile());    
resourcePath.put("keystore.jks",file.getAbsoluteFile().getPath());

Напишите ConsumerFactoryFn () для копирования сертификатов в каталог потока данных / tmp /, как описано в https://stackoverflow.com/a/53549757/4250322

Использовать KafkaIO со свойствами пути ресурса.

Properties props = new Properties();
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/tmp/truststore.jks");    
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/tmp/keystore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD); 
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD);

//other properties
...

PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
                .withBootstrapServers(BOOTSTRAP_SERVERS)
                .withTopic(TOPIC)                                
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)                
                .updateConsumerProperties(props)
                .withConsumerFactoryFn(new ConsumerFactoryFn())
                .withMaxNumRecords(50)
                .withoutMetadata()
        ).apply(Values.<String>create());

// Apply Beam transformations and write to output.

...