Создает ли Kafka Direct Stream отдельную группу потребителей (так как ей не важно, какое свойство group.id указано в приложении) - PullRequest
0 голосов
/ 27 февраля 2020

Допустим, я только что запустил приложение Kafka Direct Stream + Spark Streaming. Для первого пакета потоковый контекст в программе драйвера подключается к Kafka и извлекает startOffset и endOffset. Затем он запускает искровое задание с этими диапазонами начального и конечного смещения, чтобы исполнители могли получать записи из Kafka. Мой вопрос начинается здесь. Когда время для второго пакета, контекст потоковой передачи подключается к Kafka для начального и конечного диапазонов смещения. Как Кафка может выдавать эти диапазоны, когда нет группы потребителей (так как прямой поток не учитывает group.id), которая позволяет хранить значение последнего смещения фиксации?

1 Ответ

0 голосов
/ 27 февраля 2020

При работе с API-интерфейсом Kafka Consumer Group всегда . Неважно, с каким потоком вы имеете дело (Spark Direct Streaming, Spark Структурированная потоковая передача, Java / Scala API Kafka Consumer ...).

, поскольку прямой поток не имеет принять во внимание group.id

Посмотрите руководство по интеграции Spark + Kafka для прямой потоковой передачи (пример кода для spark-streaming-kafka010) о том, как объявить потребителя group:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

Даже если вы не объявляете группу потребителей в своей конфигурации, для вас по-прежнему будет создана (случайная) группа потребителей.

Проверьте журналы, чтобы узнать, какие group.id был использован в вашем приложении.

...