Добавьте дополнительные потребительские настройки kafka с помощью sparklyr - PullRequest
0 голосов
/ 26 января 2020

Я пытаюсь подключиться к защищенному серверу Kafka с помощью sparklyr. Однако для доступа к нему необходимо указать правильные настройки безопасности (протокол, пароль и т. Д. c). Но при указании в read_options они не передаются в конфигурацию потребителя. Вот R-код:

library(sparklyr)
config <- spark_config()
config$sparklyr.shell.packages <- "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0"
sc <- spark_connect(master = "local",config=config, version="2.4.0")

read_options <- list(
  kafka.bootstrap.servers='test.server',
  group.id="name",
  security.protocol='SSL',
  ssl.key.password="password",
  ssl.keystore.location="C:/Users/...",
  ssl.keystore.password="password",
  ssl.truststore.location="C:/Users/...",
  ssl.truststore.password="password",
  subscribe = "topic")

stream <- stream_read_kafka(sc, options = read_options) 

Если мы посмотрим на журнал искры, в конфигурации потребителя будет указан только сервер: (сокращенная версия)

INFO ConsumerConfig: ConsumerConfig values: 
    bootstrap.servers = [test.server]
    ....
    group.id = spark-kafka-source-7bb43fe7-56b2-4e19-9162-371e4db2075a-1047255113-driver-2
    ....
    security.protocol = PLAINTEXT
    ...
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ..
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

Is Есть ли возможность / способ обойти необходимые настройки для потребителя?

Обновление

См. ответ пользователя 1278798

Для кого-то с такой же проблемой это Важно добавить, что не все настройки поддерживаются искрой (например, group.id или auto.offset.reset). Просто проверьте ссылку, предоставленную user1278798.

1 Ответ

0 голосов
/ 27 января 2020

Как четко объяснено в официальной документации

Собственные конфигурации Кафки можно установить через DataStreamReader.option с префиксом kafka. , например, stream.option("kafka.bootstrap.servers", "host:port").

Вам не хватает префикса.

...