Невозможно подключиться из задания Dataflow к реестру схемы, когда для реестра схемы требуется аутентификация клиента TLS - PullRequest
0 голосов
/ 08 мая 2019

Я разрабатываю задание GCP Cloud Dataflow, в котором используется брокер Kafka и реестр схем.Наш брокер Kafka и реестр Schema требуют сертификат клиента TLS.И я сталкиваюсь с проблемой соединения с Реестром схемы при развертывании.Любое предложение приветствуется.

Вот что я делаю для задания Dataflow.Я создаю Consumer Properties для конфигураций TLS.

props.put("security.protocol", "SSL");
props.put("ssl.truststore.password", "aaa");
props.put("ssl.keystore.password", "bbb");
props.put("ssl.key.password", "ccc"));
props.put("schema.registry.url", "https://host:port")
props.put("specific.avro.reader", true);

и обновляю Consumer Properties с помощью updateConsumerProperties.

Pipeline p = Pipeline.create(options)
...
.updateConsumerProperties(properties)
... 

Как следует из этого ответа stackoverflow, я также загружаю keyStore и trustStore в локальный каталог и указываюРасположение trustStore / keyStore в ConsumerProperties в ConsumerFactory.

Truststore и Google Cloud Dataflow

Pipeline p = Pipeline.create(options)
 ...
 .withConsumerFactoryFn(new MyConsumerFactory(...))
 ...

В ConsumerFactory:

public Consumer<byte[], byte[]> apply(Map<String, Object> config)  {
  // download keyStore and trustStore from GCS bucket 
  config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
  config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
  new KafkaConsumer<byte[], byte[]>(config);
}

С этим кодом мне удалось развернуть, но задание Dataflow получилоОшибка проверки сертификата сервера TLS.

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
        sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
        sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
        sun.security.validator.Validator.validate(Validator.java:260)
        sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
        sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
        sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
        sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
        java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
        io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
        io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
        io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
        io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
        io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
        io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
        org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14)
        org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

Затем я обнаружил, что клиент реестра схемы загружает конфигурации TLS из системного свойства.https://github.com/confluentinc/schema-registry/issues/943

Я протестировал Kafka Consumer с той же конфигурацией и подтвердил, что он работает нормально.

props.put("schema.registry.url", "https://host:port")
props.put("specific.avro.reader", true);
props.put("ssl.truststore.location", System.getProperty("javax.net.ssl.trustStore"));
props.put("ssl.truststore.password", System.getProperty("javax.net.ssl.keyStore"));
props.put("ssl.keystore.location", System.getProperty("javax.net.ssl.keyStore"));
props.put("ssl.keystore.password", System.getProperty("javax.net.ssl.keyStorePassword"));
props.put("ssl.key.password", System.getProperty("javax.net.ssl.key.password"));

Далее я применил тот же подход, что означает применение тех же конфигураций TLS к системеproperties и Consumer Properties, для кода задания Dataflow.

Я указал пароль по системным свойствам при выполнении приложения.

-Djavax.net.ssl.keyStorePassword=aaa \
-Djavax.net.ssl.key.password=bbb \
-Djavax.net.ssl.trustStorePassword=ccc \

Примечание. Системное свойство для местоположения trustStore и keyStore в Consumer Factory установлено сэти файлы загружаются в локальный временный каталог.

config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
System.setProperty("javax.net.ssl.trustStore", localTrustStoreFilePath)
System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath)

, но даже при развертывании произошел сбой с ошибкой тайм-аута.

Exception in thread "main" java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
        at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
...
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
Caused by: java.lang.IllegalArgumentException: DataflowRunner requires gcpTempLocation, but failed to retrieve a value from PipelineOptions
        at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246)
Caused by: java.lang.IllegalArgumentException: Error constructing default value for gcpTempLocation: tempLocation is not a valid GCS path, gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp. 
        at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255)
...
Caused by: java.lang.RuntimeException: Unable to verify that GCS bucket gs://dev-k8s-rfid-store-dataflow exists.
        at org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86)
...
Caused by: java.io.IOException: Error getting access token for service account: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401)
...
Caused by: java.net.SocketException: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248)
...
Caused by: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at java.security.Provider$Service.newInstance(Provider.java:1617)
...
Caused by: java.io.IOException: Keystore was tampered with, or password was incorrect
    at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:780)
Caused by: java.security.UnrecoverableKeyException: Password verification failed
    at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:778)

Я что-то упустил?

Ответы [ 2 ]

0 голосов
/ 13 мая 2019

Я получил ответ от службы поддержки GCP.Похоже, что Apache Beam не поддерживает Schema Registry.

Здравствуйте, специалист по потокам данных связался со мной.Я сейчас разоблачу то, что они сказали мне.

Ответ на ваш вопрос - нет, Apache Beam не поддерживает реестр схем.Тем не менее, они сказали мне, что вы можете самостоятельно выполнять вызовы в Реестр схемы, поскольку Beam использует только необработанные сообщения, и пользователь несет ответственность за то, что ему нужно сделать с данными.

Это основано на нашем понимании случая, когда вы хотите публиковать сообщения в Kafka, и чтобы DF использовал эти сообщения, анализируя их на основе схемы из реестра.

Я надеюсь, что эта информация может быть полезна для вас, дайте мне знать, если я могу помочь вам.

Но задание Dataflow все еще может получать двоичное сообщение в формате Avro.Поэтому вы внутренне вызываете API REST Schema Registry следующим образом.https://stackoverflow.com/a/55917157

0 голосов
/ 08 мая 2019

В ConsumerFactoryFn необходимо скопировать сертификат из некоторого местоположения (например, GCS) в локальный путь к файлу на машине.

В Truststore и Google Cloud Dataflow , ConsumerFnFactory, который пишет пользователь, имеет этот фрагмент кода, который выбирает хранилище доверенных сертификатов из GCS:

            Storage storage = StorageOptions.newBuilder()
                    .setProjectId("prj-id-of-your-bucket")
                    .setCredentials(GoogleCredentials.getApplicationDefault())
                    .build()
                    .getService();
            Blob blob = storage.get("your-bucket-name", "pth.to.your.kafka.client.truststore.jks");
            ReadChannel readChannel = blob.reader();
            FileOutputStream fileOuputStream;
            fileOuputStream = new FileOutputStream("/tmp/kafka.client.truststore.jks"); //path where the jks file will be stored
            fileOuputStream.getChannel().transferFrom(readChannel, 0, Long.MAX_VALUE);
            fileOuputStream.close();
            File f = new File("/tmp/kafka.client.truststore.jks"); //assuring the store file exists
            if (f.exists())
            {
                LOG.debug("key exists");

            }
            else
            {
                LOG.error("key does not exist");

            }

Вам нужно будет сделать нечто подобное (это не обязательно должен быть GCS, но он должен быть доступен со всех виртуальных машин, выполняющих ваш конвейер в облачном потоке данных Google).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...