Как указать пользовательскую конфигурацию потоковых запросов Kafka с источником данных Kafka (например, аутентификация Confluent Cloud)? - PullRequest
1 голос
/ 04 ноября 2019

Я хочу читать и писать, используя структурированную потоковую передачу в Confluent Cloud. Проблема в том, что я не могу найти метод для аутентификации в документации.

У меня следующее соединение для передачи данных:

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=pkc-nq5ga.westeurope.azure.confluent.cloud:9092
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<CLUSTER_API_KEY>" password="<CLUSTER_API_SECRET>";
security.protocol=SASL_SSL

Когда я проверил локальный хост без пароля и у меня не было проблем с ним.

val inputStream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", brokers)
    .option("subscribe", inputTopic)
    .option("startingOffsets", startingOffsetsValue)
    .load()

 outputStream.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", outputBrokers)
    .option("topic", outputTopic)
    .option("checkpointLocation", pathCheckpoint)
    .start()
    .awaitTermination()

Кто-то знает, как пройти конфигурацию аутентификации, чтобы попасть в облако слияния

1 Ответ

2 голосов
/ 05 ноября 2019

Цитирование официальной документации Конкретные конфигурации Kafka :

Собственные конфигурации Kafka могут быть установлены через DataStreamReader.option с префиксом kafka., например, stream.option("kafka.bootstrap.servers", "host:port")

С этим мы можем передать данные соединения, например

.option("kafka.ssl.endpoint.identification.algorithm", "https")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...