Сколько потребителей Kafka использует потоковый запрос для выполнения? - PullRequest
0 голосов
/ 04 декабря 2018

Я был удивлен, увидев, что Spark получает данные из Kafka только с одним потребителем Kafka, и этот потребитель работает в контейнере драйвера.Я скорее ожидал увидеть, что Spark создает столько потребителей, сколько число разделов в теме, и запускает их в контейнерах исполнителя.

Например, у меня есть тема events с5 перегородок.Я запускаю приложение Spark Structured Streaming, которое использует эту тему и пишет в Parquet на HDFS.Приложение имеет 5 исполнителей.При рассмотрении группы потребителей Kafka, созданной Spark, я вижу, что только один потребитель отвечает за все 5 разделов.Этот потребитель работает на машине с программой драйвера:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group spark-kafka-source-08e10acf-7234-425c-a78b-3552694f22ef--1589131535-driver-0

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
events          2          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          1          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          0          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          4          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1
events          3          -               0               -               consumer-1-8c3d806d-eb1e-4536-97d5-7c9d19582942 /192.168.100.147  consumer-1

После проверки журналов всех 5 исполнителей я обнаружил, что только один из них был занят записью потребленных данных в расположение Parquet в HDFS.Остальные 4 бездействовали.

Это странно.Я ожидал, что 5 исполнителей будут использовать данные параллельно с 5 разделов Kafka и писать параллельно на HDFS.Означает ли это, что программа драйвера потребляет данные из Kafka и распределяет их по исполнителям?Это выглядит как узкое место.

ОБНОВЛЕНИЕ 1 Я попытался добавить перераспределение (5) к кадру данных потока:

spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "brokerhost:9092")
    .option("subscribe", "events")
    .option("startingOffsets", "earliest")
    .load()
    .repartition(5)

Послечто я видел, как все 5 исполнителей записывают данные в HDFS (по их журналам).Тем не менее, я видел только одного потребителя (программу-драйвер) на всех 5 разделах темы Kafka.

ОБНОВЛЕНИЕ 2 Версия Spark 2.4.0.Вот команда для подачи заявки:

spark-submit \
--name "Streaming Spark App" \
--master yarn \
--deploy-mode cluster \
--conf spark.yarn.maxAppAttempts=1 \
--conf spark.executor.instances=5 \
--conf spark.sql.shuffle.partitions=5 \
--class example.ConsumerMain \
"$jar_file
...