Я хочу создать udf in spark для работы со структурой данных Java, которая не вписывается в стиль scala fp и только изменяет ее внутренние состояния. Для упрощения, вот скелет класса java, с которым я работаю.
public class MagicStats {
List<Long> rawData;
public void processDataPoint(long dataPoint) {
rawData.add(dataPoint);
//some magic processing
}
public void merge(MagicStats anotherMagicStats) {
//merge with another instance to combine states
}
public long eval() {
//do some magic with data
//return result
}
}
Еще немного о том, что я пытаюсь сделать с этим классом. У меня есть данные с разбивкой по дням, и для каждого дневного раздела я создам некоторую сводную статистику, включая count, avg, et c и эту специальную MagicStats (которая будет получена eval()
в классе) и сохраню их в база данных. Особенностью MagicStats является:
- Мне нужен ежедневный результат MagicStats для данных.
- Мне нужно ежемесячное агрегирование ежедневных результатов MagicStats (которые нельзя вычислить арифметически на основе результаты за день и могут обрабатываться только классом).
Как видите, второе требование означает, что мне нужно сделать снимок объекта MagicStats для каждого ежедневного раздела и сохранить его как необработанные байты в столбце базы данных, чтобы, когда дело доходит до ежемесячной агрегации, я мог реконструировать все 30 объектов MagicStats в памяти из байтовых массивов и вызвать merge(MagicStats) and then eval()
для правильного агрегирования.
Теперь самое сложное. Как создать udf, который не возвращает результат из входного потока, а вместо этого изменяет внутреннее состояние объекта java? Вот где я застрял (псевдокод ниже):
//input_monthly_data
// +----------+------+
// | day | value|
// +----------+------+
// |2020-01-01| 3000|
// |2020-01-02| 4500|
// |2020-01-03| 3500|
// |..........| ....|
// +----------+------+
val df = spark.read.json("input_monthly_data.json")
df.groupby("day").agg(MyUDF(data).as("daily stats").select("daily stats", "avg", "count").saveAsTable()
class MyUDF extends UserDefinedFunction {
def apply(input: Long): Column = {
//create a static MagicStats instance
//update the state of the instance per data point
//serialize this instance to bytes after done for each day's partition
//return the bytes for later persistence
}
}
//output_monthly_data
// +----------+------+-----------------+
// | day | count| MagicStats bytes|
// +----------+------+-----------------+
// |2020-01-01| 10 | some binary. |
// |2020-01-02| 20 | some binary. |
// |2020-01-03| 25 | some binary. |
// |..........| ....| some binary. |
// +----------+------+-----------------|
Мы будем очень благодарны за любые предложения о том, как заставить эту UDF работать или другой способ достижения моей цели!