Как сгенерировать агрегацию длинных выражений набора данных Spark в цикле? - PullRequest
0 голосов
/ 19 марта 2019

Я использую Java Spark для статистики набора данных.

Мне нужно объединить набор данных по множеству выражений, чтобы код был длинным и безобразным.Expers имеют некоторые общие логики, могу ли я генерировать expers с помощью циклов?Вот пример кода, в реальном коде сотни строк с повторяющимися кодами:

Dataset<Row> res = ds.groupBy(ds.col("uid")).agg(functions.max(ds.col("create_time")).as("create_time"),functions.sum(functions.when(ds.col("date_diff").$less$eq(30).and(ds.col("call_type").isin(callTypeOut)),
                    ds.col("duration"))).as("caller_call_time_1"),
                          functions.sum(functions.when(ds.col("date_diff").$less$eq(60)
                            .and(ds.col("call_type").isin(callTypeOut)),
                    ds.col("duration"))).as("caller_call_time_2"),
            functions.sum(functions.when(ds.col("date_diff").$less$eq(90)
                            .and(ds.col("call_type").isin(callTypeOut)),
                    ds.col("duration"))).as("caller_call_time_3"),
            functions.sum(functions.when(ds.col("date_diff").$less$eq(120)
                            .and(ds.col("call_type").isin(callTypeOut)),
                    ds.col("duration"))).as("caller_call_time_4"),
            functions.sum(functions.when(ds.col("date_diff").$less$eq(150)
                            .and(ds.col("call_type").isin(callTypeOut)),
                    ds.col("duration"))).as("caller_call_time_5"),
            functions.sum(functions.when(ds.col("date_diff").$less$eq(180)
                            .and(ds.col("call_type").isin(callTypeOut)),
                    ds.col("duration"))).as("caller_call_time_6"))

Я получил решение, подобное этому:

List<Column> exprs = new ArrayList<>();
for (int i = 1; i < 7; i ++ ) {
    exprs.add(functions.sum(functions.when(ds.col("date_diff").$less$eq(30*i)
                        .and(ds.col("call_type").isin(callTypeOut)),
                ds.col("duration"))).as("caller_call_time_"+Integer.toString(i));

}
Dataset<Row> res = ds.groupBy(ds.col("uid")).agg(functions.max(ds.col("create_time")).as("create_time"),
exprs.toArray(new Column[exprs.size()]));

1 Ответ

1 голос
/ 19 марта 2019

Вы могли бы:

  1. Создайте свой фрейм данных (набор данных - это фрейм данных, в противоположность набору данных чего-либо еще) со всеми дополнительными столбцами до, а затем выполните агрегирование по вновь созданным столбцам. Вы можете создать столбцы в цикле.

  2. Создайте UDAF (пользовательскую функцию агрегирования), которая будет обрабатывать ваш пользовательский код в Java.

Надеюсь, это поможет ...

...