Как создать UDF с отслеживанием состояния в Spark? - PullRequest
0 голосов
/ 07 мая 2020

Я хочу создать udf in spark для работы со структурой данных Java, которая не вписывается в стиль scala fp и только изменяет ее внутренние состояния. Для упрощения, вот скелет класса java, с которым я работаю.

public class MagicStats {
    List<Long> rawData;

    public void processDataPoint(long dataPoint) {
        rawData.add(dataPoint);
        //some magic processing
    }

    public void merge(MagicStats anotherMagicStats) {
        //merge with another instance to combine states
    }

    public long eval() {
        //do some magic with data
        //return result
    }
}

Еще немного о том, что я пытаюсь сделать с этим классом. У меня есть данные с разбивкой по дням, и для каждого дневного раздела я создам некоторую сводную статистику, включая count, avg, et c и эту специальную MagicStats (которая будет получена eval() в классе) и сохраню их в база данных. Особенностью MagicStats является:

  1. Мне нужен ежедневный результат MagicStats для данных.
  2. Мне нужно ежемесячное агрегирование ежедневных результатов MagicStats (которые нельзя вычислить арифметически на основе результаты за день и могут обрабатываться только классом).

Как видите, второе требование означает, что мне нужно сделать снимок объекта MagicStats для каждого ежедневного раздела и сохранить его как необработанные байты в столбце базы данных, чтобы, когда дело доходит до ежемесячной агрегации, я мог реконструировать все 30 объектов MagicStats в памяти из байтовых массивов и вызвать merge(MagicStats) and then eval() для правильного агрегирования.

Теперь самое сложное. Как создать udf, который не возвращает результат из входного потока, а вместо этого изменяет внутреннее состояние объекта java? Вот где я застрял (псевдокод ниже):

//input_monthly_data
// +----------+------+
// |   day    | value|
// +----------+------+
// |2020-01-01|  3000|
// |2020-01-02|  4500|
// |2020-01-03|  3500|
// |..........|  ....|
// +----------+------+

val df = spark.read.json("input_monthly_data.json")
df.groupby("day").agg(MyUDF(data).as("daily stats").select("daily stats", "avg", "count").saveAsTable()

class MyUDF extends UserDefinedFunction {
    def apply(input: Long): Column = {
        //create a static MagicStats instance
        //update the state of the instance per data point
        //serialize this instance to bytes after done for each day's partition
        //return the bytes for later persistence
    }
}
//output_monthly_data
// +----------+------+-----------------+
// |   day    | count| MagicStats bytes|
// +----------+------+-----------------+
// |2020-01-01|  10  | some binary.    |
// |2020-01-02|  20  | some binary.    |
// |2020-01-03|  25  | some binary.    |
// |..........|  ....| some binary.    |
// +----------+------+-----------------|

Мы будем очень благодарны за любые предложения о том, как заставить эту UDF работать или другой способ достижения моей цели!

1 Ответ

1 голос
/ 09 мая 2020

Я думаю, что, возможно, вы захотите реализовать UserDefinedAggregateFunction, а не UserDefinedFunction.

Чтобы получить совокупный результат, UserDefinedAggregateFunction имеет концепцию обновления состояния для каждой точки данных в заданная группа, которая, кажется, именно то, что вам нужно.

См. эти ссылки для получения дополнительной информации:

...