Как сохранить порядок событий для ключа в структурированном потоковом распределении по ключу? - PullRequest
0 голосов
/ 09 ноября 2018

Я хочу написать структурированного потребителя Kafka для потоковой искровой потоковой передачи, который считывает данные из раздела Kafka с одним разделом, перераспределяет входящие данные по «ключу» на 3 раздела Spark, сохраняя сообщения, упорядоченные по ключу, и записывает их в другую тему Kafka. с 3 перегородками.

Я использовал Dataframe.repartition(3, $"key"), который, я полагаю, использует HashPartitioner. Код указан ниже.

Когда я выполнил запрос с типом триггера с фиксированным интервалом, я визуально проверил выходные сообщения в ожидаемом порядке. Я предполагаю, что порядок на полученном разделе не гарантирован. Я ожидаю получить какое-либо подтверждение или вето в моем предположении с точки зрения указателей кода в репо или документации спарк-кода.

Я также пытался использовать Dataframe.sortWithinPartitions, однако, похоже, это не поддерживается для потокового фрейма данных без агрегирования.

Один из вариантов, который я попробовал, состоял в том, чтобы преобразовать Dataframe в RDD и применить repartitionAndSortWithinPartitions, который перераспределяет RDD в соответствии с заданным разделителем и в каждом полученном разделе сортирует записи по их ключам. Однако тогда я не могу использовать этот RDD в операции query.writestream для записи результата в тему вывода Kafka.

  1. Существует ли API перераспределения фреймов данных, который помогает сортировать перераспределенные данные в контексте потоковой передачи?
  2. Есть ли другие альтернативы?
  3. Предоставляет ли тип триггера по умолчанию или тип триггера с фиксированным интервалом для выполнения микропакета какие-либо гарантии порядка сообщений?

Входящие данные:

case class KVOutput(key: String, ts: Long, value: String, spark_partition: Int)

val df = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers.get)
  .option("subscribe", Array(kafkaInputTopic.get).mkString(","))
  .option("maxOffsetsPerTrigger",30)
  .load()

val inputDf = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
val resDf = inputDf.repartition(3, $"key")
  .select(from_json($"value", schema).as("kv"))
  .selectExpr("kv.key", "kv.ts", "kv.value")
  .withColumn("spark_partition", spark_partition_id())
  .select($"key", $"ts", $"value", $"spark_partition").as[KVOutput]
  .sortWithinPartitions($"ts", $"value")
  .select($"key".cast(StringType).as("key"), to_json(struct($"*")).cast(StringType).as("value"))

val query = resDf.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBrokers.get)
  .option("topic", kafkaOutputTopic.get)
  .option("checkpointLocation", checkpointLocation.get)
  .start()

Когда я отправляю это приложение, оно перестает работать с

8/11/08 22:13:20 ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...