Spark 2.4.0 Структурированный потоковый набор Kafka customer group.id - PullRequest
1 голос
/ 26 марта 2019

Я хочу использовать Spark Structured Streaming для чтения с защищенной кафки. Это означает, что мне нужно будет форсировать определенный group.id. Однако, как указано в документации, это невозможно. Тем не менее, в документации по базам данных https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl, говорится, что это возможно. Относится ли это только к лазурному кластеру?

Кроме того, взглянув на документацию основной ветки репо apache / spark https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md,, мы можем понять, что такая функциональность предназначена для добавления в более поздние версии спарк-релизов. Известны ли вам какие-либо планы такого стабильного выпуска, которые позволят установить эту группу потребителей. Id?

Если нет, существуют ли какие-либо обходные пути для Spark 2.4.0, чтобы иметь возможность установить определенную группу потребителей. Id?

1 Ответ

1 голос
/ 26 марта 2019

В настоящее время (v2.4.0) это невозможно.

Вы можете проверить следующие строки в проекте Apache Spark:

https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L81 - generate group.id

https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L534 - установить его в свойствах, которые используются для создания KafkaConsumer

В основной ветви Вы можете найти модификацию, позволяющую установить префикс или конкретный group.id

https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L83 - генерировать group.id на основе группыпрефикс (groupidprefix)

https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L543 - установить ранее созданный groupId, если kafka.group.id не было передано в свойствах

...