Как узнать имя группы потребителей Kafka, которую потоковый запрос использует для источника данных Kafka? - PullRequest
0 голосов
/ 28 февраля 2019

Я использую данные из темы кафки через структурированную потоковую передачу, в теме 3 раздела.Поскольку структурированная потоковая передача Spark не позволяет вам явно предоставить group.id и назначает некоторый случайный идентификатор потребителю, я попытался проверить идентификаторы группы потребителей, используя приведенную ниже команду kafka

./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list

output
 spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
 spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
 spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0

Ниже приведены мои вопросы

1) Почему создаются 3 группы потребителей?Это из-за 3 разделов?

2) Можно ли как-нибудь получить имена этих групп потребителей в приложении spark?

3) Несмотря на то, что мое искровое приложение все еще работало, через некоторое время названия этих групп не появились в списке групп потребителей.Это потому, что все данные были использованы приложением spark, и в этой теме кафки больше не было данных?

4) Если мое предположение верно в отношении пункта 3, создаст ли он новый идентификатор группы потребителей, если поступят новые данныеили имя группы потребителей останется прежним?

Ниже приведен мой поток чтения

  val inputDf = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", topic)
 // .option("assign"," {\""+topic+"\":[0]}") 
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 60000)
  .load()

У меня есть 3 потоковых приложения в приложении, как показано ниже

  val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)") 
  val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")

//First stream
 val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
   df1.agg(min("offset"), max("offset"))
  .writeStream
  .foreach(writer)
  .outputMode("complete")
  .option("checkpointLocation", checkpoint_loc1).start()
val result = df.select(
df1("result").getItem("_1").as("col1"),
df1("result").getItem("_2").as("col2"),
df1("result").getItem("_5").as("eventdate"))
val distDates = result.select(result("eventdate")).distinct

//Second stream
val checkpoint_loc2=  "/warehouse/test_duplicate/download/chk2" 
distDates.writeStream.foreach(writer1)
  .option("checkpointLocation", checkpoint_loc2).start() 

//Third stream
val kafkaOutput =result.writeStream
  .outputMode("append")
  .format("orc")
  .option("path",data_dir)
  .option("checkpointLocation", checkpoint_loc3)
  .start()

Потоковый запросиспользуется только один раз в коде и нет соединений.

План выполнения

== Parsed Logical Plan ==
 StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

1 Ответ

0 голосов
/ 07 марта 2019

1) Почему он создает 3 группы потребителей?Это из-за 3 разделов?

Конечно нет.Это просто совпадение.Похоже, что вы уже запускали приложение 3 раза, а в теме 3 раздела.

Давайте начнем сначала, чтобы создать резервную копию.

Я удалил все группы потребителей, чтобы убедиться, что мы начинаем заново.

$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0

$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0
Deletion of requested consumer groups ('spark-kafka-source-cd8c4070-cac0-4653-81bd-4819501769f9-1567209638-driver-0') was successful.

$ ./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --delete --group spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0
Deletion of requested consumer groups ('spark-kafka-source-6a60f735-f05c-49e4-ae88-a6193e7d4bf8--525530617-driver-0') was successful.

$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
// nothing got printed out

Я создал тему с 5 разделами.

$ ./bin/kafka-topics.sh --create --zookeeper :2181 --topic jacek-five-partitions --partitions 5 --replication-factor 1
Created topic "jacek-five-partitions".

$ ./bin/kafka-topics.sh --describe --zookeeper :2181 --topic jacek-five-partitions
Topic:jacek-five-partitions PartitionCount:5    ReplicationFactor:1 Configs:
    Topic: jacek-five-partitions    Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    Topic: jacek-five-partitions    Partition: 4    Leader: 0   Replicas: 0 Isr: 0

Я использую следующий код:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object SparkApp extends App {

  val spark = SparkSession.builder.master("local[*]").getOrCreate()
  import spark.implicits._
  val q = spark
    .readStream
    .format("kafka")
    .option("startingoffsets", "latest")
    .option("subscribe", "jacek-five-partitions")
    .option("kafka.bootstrap.servers", ":9092")
    .load
    .select($"value" cast "string")
    .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("30 seconds"))
    .start
  q.awaitTermination()
}

Когда я запускаю вышеупомянутую Spark StructuredПотоковое приложение, у меня есть только одна созданная группа потребителей.

$ ./bin/kafka-consumer-groups.sh --list --bootstrap-server :9092
spark-kafka-source-380da653-c829-45db-859f-09aa9b37784d-338965656-driver-0

И это имеет смысл, поскольку вся обработка Spark должна использовать столько потребителей Kafka, сколько существует разделов, но независимо от количества потребителей, там должно бытьтолько одна группа потребителей (или потребители Kafka будут использовать все записи, и в них будут дубликаты).


2) Можно ли как-нибудь получить имена этих групп потребителей в приложении spark?

Для этого нет общедоступного API, поэтому ответ - нет.

Однако вы можете взломать Spark и перейти ниже общедоступного API до внутреннего Kafka.потребитель, который использует эту строку :

val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"

Или даже эту строку , если быть точным:

val kafkaOffsetReader = new KafkaOffsetReader(
  strategy(caseInsensitiveParams),
  kafkaParamsForDriver(specifiedKafkaParams),
  parameters,
  driverGroupIdPrefix = s"$uniqueGroupId-driver")

Просто найдите KafkaMicroBatchReaderдля источника данных Kafka запросите его для KafkaOffsetReader, который знает groupId.Это кажется выполнимым.


Несмотря на то, что мое искровое приложение все еще работало, через некоторое время эти имена групп не отображались в списке групп потребителей.Это связано с тем, что все данные были использованы искровым приложением, и в этой теме kafka больше не было данных?

Может ли это быть связано с KIP-211: пересмотреть семантику истечения срока действия смещений групп потребителей, который говорит:

Смещение тематического раздела в группе потребителей истекает, когда достигается отметка времени истечения, связанная с этим разделом.На эту метку времени истечения обычно влияет конфигурация брокера offsets.retention.minutes, если только пользователь не переопределяет это значение по умолчанию и не использует настраиваемое сохранение.


4) создаст новую группу потребителейid, если поступят новые данные или имя группы потребителей останется прежним?

останется прежним.

Кроме того, группу потребителей нельзя удалять, когда хотя бы один потребитель изгруппа активна.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...