Я использую данные из темы кафки через структурированную потоковую передачу, в теме 3 раздела.Поскольку структурированная потоковая передача Spark не позволяет вам явно предоставить group.id и назначает некоторый случайный идентификатор потребителю, я попытался проверить идентификаторы группы потребителей, используя приведенную ниже команду kafka
./kafka-consumer-groups.sh --bootstrap-server kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092 --list
output
spark-kafka-source-054e8dac-bea9-46e8-9374-8298daafcd23--1587684247-driver-0
spark-kafka-source-756c08e8-6a84-447c-8326-5af1ef0412f5-209267112-driver-0
spark-kafka-source-9528b191-4322-4334-923d-8c1500ef8194-2006218471-driver-0
Ниже приведены мои вопросы
1) Почему создаются 3 группы потребителей?Это из-за 3 разделов?
2) Можно ли как-нибудь получить имена этих групп потребителей в приложении spark?
3) Несмотря на то, что мое искровое приложение все еще работало, через некоторое время названия этих групп не появились в списке групп потребителей.Это потому, что все данные были использованы приложением spark, и в этой теме кафки больше не было данных?
4) Если мое предположение верно в отношении пункта 3, создаст ли он новый идентификатор группы потребителей, если поступят новые данныеили имя группы потребителей останется прежним?
Ниже приведен мой поток чтения
val inputDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
// .option("assign"," {\""+topic+"\":[0]}")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 60000)
.load()
У меня есть 3 потоковых приложения в приложении, как показано ниже
val df = inputDf.selectExpr("CAST(value AS STRING)","CAST(topic AS STRING)","CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
val df1 = inputDf.selectExpr("CAST (partition AS INT)","CAST (offset AS INT)","CAST (timestamp AS STRING)")
//First stream
val checkpoint_loc1= "/warehouse/test_duplicate/download/chk1"
df1.agg(min("offset"), max("offset"))
.writeStream
.foreach(writer)
.outputMode("complete")
.option("checkpointLocation", checkpoint_loc1).start()
val result = df.select(
df1("result").getItem("_1").as("col1"),
df1("result").getItem("_2").as("col2"),
df1("result").getItem("_5").as("eventdate"))
val distDates = result.select(result("eventdate")).distinct
//Second stream
val checkpoint_loc2= "/warehouse/test_duplicate/download/chk2"
distDates.writeStream.foreach(writer1)
.option("checkpointLocation", checkpoint_loc2).start()
//Third stream
val kafkaOutput =result.writeStream
.outputMode("append")
.format("orc")
.option("path",data_dir)
.option("checkpointLocation", checkpoint_loc3)
.start()
Потоковый запросиспользуется только один раз в коде и нет соединений.
План выполнения
== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Analyzed Logical Plan ==
key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Optimized Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@109e44ba, kafka, Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@593197cb,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 60000, startingOffsets -> earliest, subscribe -> downloading, kafka.bootstrap.servers -> kfk01.sboxdc.com:9092,kfk02.sboxdc.com:9092,kfk03.sboxdc.com:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
== Physical Plan ==
StreamingRelation kafka, [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]