Я хочу написать структурированного потребителя Kafka для потоковой искровой потоковой передачи, который считывает данные из раздела Kafka с одним разделом, перераспределяет входящие данные по «ключу» на 3 раздела Spark, сохраняя сообщения, упорядоченные по ключу, и записывает их в другую тему Kafka. с 3 перегородками.
Я использовал Dataframe.repartition(3, $"key")
, который, я полагаю, использует HashPartitioner. Код указан ниже.
Когда я выполнил запрос с типом триггера с фиксированным интервалом, я визуально проверил выходные сообщения в ожидаемом порядке. Я предполагаю, что порядок на полученном разделе не гарантирован. Я ожидаю получить какое-либо подтверждение или вето в моем предположении с точки зрения указателей кода в репо или документации спарк-кода.
Я также пытался использовать Dataframe.sortWithinPartitions
, однако, похоже, это не поддерживается для потокового фрейма данных без агрегирования.
Один из вариантов, который я попробовал, состоял в том, чтобы преобразовать Dataframe в RDD и применить repartitionAndSortWithinPartitions
, который перераспределяет RDD в соответствии с заданным разделителем и в каждом полученном разделе сортирует записи по их ключам. Однако тогда я не могу использовать этот RDD в операции query.writestream для записи результата в тему вывода Kafka.
- Существует ли API перераспределения фреймов данных, который помогает сортировать перераспределенные данные в контексте потоковой передачи?
- Есть ли другие альтернативы?
- Предоставляет ли тип триггера по умолчанию или тип триггера с фиксированным интервалом для выполнения микропакета какие-либо гарантии порядка сообщений?
Входящие данные:
case class KVOutput(key: String, ts: Long, value: String, spark_partition: Int)
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers.get)
.option("subscribe", Array(kafkaInputTopic.get).mkString(","))
.option("maxOffsetsPerTrigger",30)
.load()
val inputDf = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
val resDf = inputDf.repartition(3, $"key")
.select(from_json($"value", schema).as("kv"))
.selectExpr("kv.key", "kv.ts", "kv.value")
.withColumn("spark_partition", spark_partition_id())
.select($"key", $"ts", $"value", $"spark_partition").as[KVOutput]
.sortWithinPartitions($"ts", $"value")
.select($"key".cast(StringType).as("key"), to_json(struct($"*")).cast(StringType).as("value"))
val query = resDf.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBrokers.get)
.option("topic", kafkaOutputTopic.get)
.option("checkpointLocation", checkpointLocation.get)
.start()
Когда я отправляю это приложение, оно перестает работать с
8/11/08 22:13:20 ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;