Мне пришлось реализовать пакет Windowing, ориентированный на события, с различным количеством имен событий.
Правило следующее: для определенного события, каждый раз, когда оно происходит, мы суммируем все другие события в соответствии с определенными временными окнами.
action1 00:01
action2 00:02
action1 00:03
action3 00:04
action3 00:05
Для указанного набора данных оно должно быть:
window_before: Map(action1 -> 1)
window_after: Map(action1 -> 1, action3 -> 2)
Чтобы достичь этого, мы используем WindowSpec
и пользовательский udaf, который объединяет все счетчики в карту. Udaf необходим, потому что количество имен действий полностью произвольно.
Конечно, сначала UDAF использовал конвертеры катализаторов Spark, которые были ужасно медленными.
Теперь я достиг того, что я считаю приличным оптимумом, когда я просто поддерживаю массив ключей и значений с неизменяемыми списками (меньшее время GC, меньшие накладные расходы итератора), все сериализованные как двоичные, поэтому среда выполнения Scala обрабатывает бокс / распаковывать, а не Spark, используя байтовые массивы вместо строк.
Проблема в том, что некоторые отставшие являются очень проблематичными, и рабочая нагрузка не может быть распараллелена, в отличие от того, что у нас было статическое число столбцов и мы просто суммировали / подсчитывали числовые столбцы.
Я пытался протестировать другую технику, в которой я создал число столбцов, равное максимальному количеству событий, а затем агрегировал их обратно на карту, но количество столбцов в проекции просто убивало искру (легко подумайте о тысяче столбцов) .
Одной из проблем являются огромные отставшие, где большую часть времени один раздел (что-то вроде userid, app) будет в 100 раз дольше, чем медиана, даже если все правильно перераспределено.
Кто-нибудь еще сталкивался с подобной проблемой?
Пример WindowSpec
:
val windowSpec = Window
.partitionBy($"id", $"product_id")
.orderBy("time")
.rangeBetween(-30days, -1)
тогда
df.withColumn("over30days", myUdaf("name", "count").over(windowSpec))
Наивная версия UDAF:
class UDAF[A] {
private var zero: A = ev.zero
val dt = schemaFor[A].dataType
override def bufferSchema: StructType =
StructType(StructField("actions", MapType(StringType, dt) :: Nil)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
name = row.get(0)
count = row.get(1)
buffer.update(name, buffer.getOrElse(name, ev.zero) + count)
}
}
Моя текущая версия менее читаема, чем предыдущая наивная версия, но фактически делает то же самое, два двоичных массива, чтобы обойти CatalystConverters
.