Я использую события из концентратора событий на Azure и хочу рассчитать показатели, которые основаны не на отдельных событиях, а на парах событий. Я нашел много информации о том, как работать с агрегатами, но не так много зависимых от последовательности показателей (например, опережение, отставание). Я понимаю, что в Spark Streaming нет таких готовых функций.
Допущения для решения проблемы:
- События от устройств Я получаю, появляются в правильном порядке. Из двух сообщений, отправленных с одного устройства, одно с большим смещением обозначает самое последнее событие.
- В концентраторе событий больше уникальных устройств, чем разделов (т.е. последовательность сообщений в разделе не может предполагается, что он принадлежит одному и тому же устройству
Цель:
Чтобы иметь возможность вычислять столбцы опережающих и запаздывающих значений для всех сообщений в пакетном режиме, т.е. только рассчитайте его по прибытии новой дельты.
+---------+-----------------+--------+-----------+------------+
| batchId | deviceReference | offset | lagOffset | leadOffset |
+---------+-----------------+--------+-----------+------------+
| 100 | a | 1 | null | 3 |
| 100 | b | 2 | null | 5 |
| 100 | a | 3 | 1 | 6 |
| 200 | c | 4 | null | 7 |
| 200 | b | 5 | 2 | null |
| 300 | a | 6 | 3 | null |
| 300 | c | 7 | 4 | null |
+---------+-----------------+--------+-----------+------------+
lagOffset и leadOffset - это столбцы, которые я хочу вычислить, а batchId - это идентификатор событий, потребляемых из концентратора событий в данный момент времени. Очевидно, без доступа к предыдущее и следующее смещение для deviceReference, первая и последняя запись в каждом batchId будут иметь нулевое значение lag- и leadOffset, поэтому я решил, что кэширование даст go.
попытку
Создать пустой DF и сохранить (только при первом запуске, в противном случае существует кэш)
if (!positionCacheExists)
spark.createDataFrame(sc.emptyRDD[Row], batchCacheSchema)
.write
.partitionBy("deviceReference")
.format("json")
.mode(org.apache.spark.sql.SaveMode.Append)
.save(batchCachePath)
Crea те оконные функции, которые используются для расчета опережения / отставания и для исключения последней записи (так как в ней будет отсутствовать значение опережения, и она будет сохранена в следующей партии (если опережение поступило).
val w = Window.partitionBy("deviceReference").orderBy(desc("offset"))
val leadLag = Window.partitionBy("deviceReference").orderBy("offset")
Поток процесса
incomingDeviceStream.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
//empty if first time, otherwise contains the second latest events per device from previous batch
val batchCache = spark.read.schema(lastBatchSchema).format("json").load(positionCachePath)
val modBatch = batchDF.withColumn("maxRow", rank().over(w))
modBatch.join(batchCache,
modBatch("deviceReference")===batchCache("deviceReference") &&
modBatch("offset")>batchCache("offset"))
.withColumn("last", batchCache("offset"))
.union(modBatch.join(batchCache,
modBatch("deviceReference")===batchCatche("deviceReference"),
"left_anti")
.withColumn("last", lit(null)))
.withColumn("lagOffset", coalesce(lag(col("offset"),1).over(leadLag), col("last")).as("lagOffset"))
.withColumn("leadOffset", lead(col("offset"),1).over(leadLag).as("leadOffset"))
.filter(col("maxRow")>1)
.write
.partitionBy("deviceReference")
.format("parquet")
.mode(Append)
.save(deviceEventPath)
//Store the second latest event per device as the last will have null as lead and will be set when the next event comes
modBatch.filter(col("maxRow")===2)
.write
.partitionBy("deviceReference")
.format("json")
.mode(Overwrite)
.save(batchCachePath)
}
.trigger(Trigger.Once())
.start()
.awaitTermination()
Отражения:
- Это кажется очень хрупким и неудобным, как будто я испортил состояние в batchCache DF, у меня будет беспорядок. Мне бы хотелось использовать встроенные функции в качестве контрольных точек, но я не могу понять, как заставить их работать для меня, когда мне нужно изменить объем пакета, чтобы исключить последнюю запись для устройства.
- Очевидно, я можно применить задержку опережений к целевой таблице потокового задания, но это сделает вычисления намного больше, избыточнее, и это также приведет к обновлению данных, если только последняя запись на устройство не будет обрезана (та, где опережает ноль).
Вопросы:
- Я думаю, что мне здесь чего-то не хватает. Поиск соседних событий для события очень важен для мер, которые я хочу рассчитать, и я уверен, что я не единственный. Если бы у всех устройств были свои собственные разделы, я думаю, что проблема была бы решена, но, поскольку механизм разделения в Event Hub хэширует, я не знаю, как это можно было бы применить. Может кто-нибудь сказать мне, если я атакую это не в том месте?
- Если я не атакую это не в том месте, какие функции Spark я не использую, чтобы использовать его для повышения надежности?
Приветствия