Как создать WindowSpec для подсчета строк по типу до и после текущей строки? - PullRequest
0 голосов
/ 04 сентября 2018

Мне пришлось реализовать пакет Windowing, ориентированный на события, с различным количеством имен событий.

Правило следующее: для определенного события, каждый раз, когда оно происходит, мы суммируем все другие события в соответствии с определенными временными окнами.

action1 00:01
action2 00:02
action1 00:03
action3 00:04
action3 00:05

Для указанного набора данных оно должно быть:

window_before: Map(action1 -> 1)
window_after: Map(action1 -> 1, action3 -> 2)

Чтобы достичь этого, мы используем WindowSpec и пользовательский udaf, который объединяет все счетчики в карту. Udaf необходим, потому что количество имен действий полностью произвольно.

Конечно, сначала UDAF использовал конвертеры катализаторов Spark, которые были ужасно медленными.

Теперь я достиг того, что я считаю приличным оптимумом, когда я просто поддерживаю массив ключей и значений с неизменяемыми списками (меньшее время GC, меньшие накладные расходы итератора), все сериализованные как двоичные, поэтому среда выполнения Scala обрабатывает бокс / распаковывать, а не Spark, используя байтовые массивы вместо строк.

Проблема в том, что некоторые отставшие являются очень проблематичными, и рабочая нагрузка не может быть распараллелена, в отличие от того, что у нас было статическое число столбцов и мы просто суммировали / подсчитывали числовые столбцы.

Я пытался протестировать другую технику, в которой я создал число столбцов, равное максимальному количеству событий, а затем агрегировал их обратно на карту, но количество столбцов в проекции просто убивало искру (легко подумайте о тысяче столбцов) .

Одной из проблем являются огромные отставшие, где большую часть времени один раздел (что-то вроде userid, app) будет в 100 раз дольше, чем медиана, даже если все правильно перераспределено.

Кто-нибудь еще сталкивался с подобной проблемой?

Пример WindowSpec:

val windowSpec = Window
    .partitionBy($"id", $"product_id")
    .orderBy("time")
    .rangeBetween(-30days, -1)

тогда

df.withColumn("over30days", myUdaf("name", "count").over(windowSpec))

Наивная версия UDAF:

class UDAF[A] {
    private var zero: A = ev.zero
    val dt = schemaFor[A].dataType

    override def bufferSchema: StructType =
        StructType(StructField("actions", MapType(StringType, dt) :: Nil)  

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        name = row.get(0)
        count = row.get(1)
        buffer.update(name, buffer.getOrElse(name, ev.zero) + count)
    }
}

Моя текущая версия менее читаема, чем предыдущая наивная версия, но фактически делает то же самое, два двоичных массива, чтобы обойти CatalystConverters.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...