Как лучше рассчитать скользящие метрики, которые охватывают партии в структурированной потоковой передаче Spark? - PullRequest
2 голосов
/ 23 апреля 2020

Я использую события из концентратора событий на Azure и хочу рассчитать показатели, которые основаны не на отдельных событиях, а на парах событий. Я нашел много информации о том, как работать с агрегатами, но не так много зависимых от последовательности показателей (например, опережение, отставание). Я понимаю, что в Spark Streaming нет таких готовых функций.

Допущения для решения проблемы:

  1. События от устройств Я получаю, появляются в правильном порядке. Из двух сообщений, отправленных с одного устройства, одно с большим смещением обозначает самое последнее событие.
  2. В концентраторе событий больше уникальных устройств, чем разделов (т.е. последовательность сообщений в разделе не может предполагается, что он принадлежит одному и тому же устройству

Цель:

Чтобы иметь возможность вычислять столбцы опережающих и запаздывающих значений для всех сообщений в пакетном режиме, т.е. только рассчитайте его по прибытии новой дельты.

+---------+-----------------+--------+-----------+------------+
| batchId | deviceReference | offset | lagOffset | leadOffset |
+---------+-----------------+--------+-----------+------------+
|     100 | a               |      1 | null      | 3          |
|     100 | b               |      2 | null      | 5          |
|     100 | a               |      3 | 1         | 6          |
|     200 | c               |      4 | null      | 7          |
|     200 | b               |      5 | 2         | null       |
|     300 | a               |      6 | 3         | null       |
|     300 | c               |      7 | 4         | null       |
+---------+-----------------+--------+-----------+------------+

lagOffset и leadOffset - это столбцы, которые я хочу вычислить, а batchId - это идентификатор событий, потребляемых из концентратора событий в данный момент времени. Очевидно, без доступа к предыдущее и следующее смещение для deviceReference, первая и последняя запись в каждом batchId будут иметь нулевое значение lag- и leadOffset, поэтому я решил, что кэширование даст go.

попытку

Создать пустой DF и сохранить (только при первом запуске, в противном случае существует кэш)

if (!positionCacheExists)
  spark.createDataFrame(sc.emptyRDD[Row], batchCacheSchema)
    .write
    .partitionBy("deviceReference")
    .format("json")
    .mode(org.apache.spark.sql.SaveMode.Append)
    .save(batchCachePath)

Crea те оконные функции, которые используются для расчета опережения / отставания и для исключения последней записи (так как в ней будет отсутствовать значение опережения, и она будет сохранена в следующей партии (если опережение поступило).

    val w = Window.partitionBy("deviceReference").orderBy(desc("offset"))
    val leadLag = Window.partitionBy("deviceReference").orderBy("offset")

Поток процесса

  incomingDeviceStream.writeStream
                      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>

                            //empty if first time, otherwise contains the second latest events per device from previous batch
                            val batchCache = spark.read.schema(lastBatchSchema).format("json").load(positionCachePath)
                            val modBatch = batchDF.withColumn("maxRow", rank().over(w))

                            modBatch.join(batchCache, 
                                          modBatch("deviceReference")===batchCache("deviceReference") &&
                                          modBatch("offset")>batchCache("offset"))
                                  .withColumn("last", batchCache("offset"))
                                  .union(modBatch.join(batchCache,
                                          modBatch("deviceReference")===batchCatche("deviceReference"),
                                          "left_anti")
                                  .withColumn("last", lit(null)))
                                  .withColumn("lagOffset", coalesce(lag(col("offset"),1).over(leadLag), col("last")).as("lagOffset"))
                                  .withColumn("leadOffset", lead(col("offset"),1).over(leadLag).as("leadOffset"))
                                  .filter(col("maxRow")>1)
                                  .write
                                  .partitionBy("deviceReference")
                                  .format("parquet")
                                  .mode(Append)
                                  .save(deviceEventPath)

                            //Store the second latest event per device as the last will have null as lead and will be set when the next event comes
                            modBatch.filter(col("maxRow")===2)
                                  .write
                                  .partitionBy("deviceReference")
                                  .format("json")
                                  .mode(Overwrite)
                                  .save(batchCachePath)
                                       }
                      .trigger(Trigger.Once())
                      .start()
                      .awaitTermination()

Отражения:

  • Это кажется очень хрупким и неудобным, как будто я испортил состояние в batchCache DF, у меня будет беспорядок. Мне бы хотелось использовать встроенные функции в качестве контрольных точек, но я не могу понять, как заставить их работать для меня, когда мне нужно изменить объем пакета, чтобы исключить последнюю запись для устройства.
  • Очевидно, я можно применить задержку опережений к целевой таблице потокового задания, но это сделает вычисления намного больше, избыточнее, и это также приведет к обновлению данных, если только последняя запись на устройство не будет обрезана (та, где опережает ноль).

Вопросы:

  • Я думаю, что мне здесь чего-то не хватает. Поиск соседних событий для события очень важен для мер, которые я хочу рассчитать, и я уверен, что я не единственный. Если бы у всех устройств были свои собственные разделы, я думаю, что проблема была бы решена, но, поскольку механизм разделения в Event Hub хэширует, я не знаю, как это можно было бы применить. Может кто-нибудь сказать мне, если я атакую ​​это не в том месте?
    • Если я не атакую ​​это не в том месте, какие функции Spark я не использую, чтобы использовать его для повышения надежности?

Приветствия

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...