Я был удивлен, увидев, что Spark получает данные из Kafka только с одним потребителем Kafka, и этот потребитель работает в контейнере драйвера.Я скорее ожидал увидеть, что Spark создает столько потребителей, сколько число разделов в теме, и запускает их в контейнерах исполнителя.
Например, у меня есть тема events с5 перегородок.Я запускаю приложение Spark Structured Streaming, которое использует эту тему и пишет в Parquet на HDFS.Приложение имеет 5 исполнителей.При рассмотрении группы потребителей Kafka, созданной Spark, я вижу, что только один потребитель отвечает за все 5 разделов.Этот потребитель работает на машине с программой драйвера:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group spark-kafka-source-08e10acf-7234-425c-a78b-3552694f22ef--1589131535-driver-0
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
events 2 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 1 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 0 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 4 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
events 3 - 0 - consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147 consumer-1
После проверки журналов всех 5 исполнителей я обнаружил, что только один из них был занят записью потребленных данных в расположение Parquet в HDFS.Остальные 4 бездействовали.
Это странно.Я ожидал, что 5 исполнителей будут использовать данные параллельно с 5 разделов Kafka и писать параллельно на HDFS.Означает ли это, что программа драйвера потребляет данные из Kafka и распределяет их по исполнителям?Это выглядит как узкое место.
ОБНОВЛЕНИЕ 1 Я попытался добавить перераспределение (5) к кадру данных потока:
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost:9092")
.option("subscribe", "events")
.option("startingOffsets", "earliest")
.load()
.repartition(5)
Послечто я видел, как все 5 исполнителей записывают данные в HDFS (по их журналам).Тем не менее, я видел только одного потребителя (программу-драйвер) на всех 5 разделах темы Kafka.
ОБНОВЛЕНИЕ 2 Версия Spark 2.4.0.Вот команда для подачи заявки:
spark-submit \
--name "Streaming Spark App" \
--master yarn \
--deploy-mode cluster \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.executor.instances=5 \
--conf spark.sql.shuffle.partitions=5 \
--class example.ConsumerMain \
"$jar_file