Как конвертировать Row в раздел - PullRequest
1 голос
/ 02 марта 2020

У меня есть сценарий в искре. Придется разделить фрейм данных. результат должен обрабатываться каждым разделом одновременно.

List<String> data = Arrays.asList("con_dist_1", "con_dist_2", 
        "con_dist_3", "con_dist_4", "con_dist_5",
        "con_dist_6");
Dataset<Row> codes = sparkSession.createDataset(data, Encoders.STRING());
Dataset<Row> partitioned_codes = codes.repartition(col("codes"));

// I need to paritition it dues to functional requirement
partitioned_codes.foreachPartition(itr -> {
    if (itr.hasNext()) {
        Row inrow = itr.next();
        System.out.println("inrow.length : " + inrow.length());
        System.out.println(inrow.toString());
        List<Object> objs = inrow.getList(0);
    }
});

Ошибка получения

Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to scala.collection.Seq
    at org.apache.spark.sql.Row$class.getSeq(Row.scala:283)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getSeq(rows.scala:166)
    at org.apache.spark.sql.Row$class.getList(Row.scala:291)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getList(rows.scala:166)

Вопрос: Как обрабатывать foreachPartition здесь, где itr каждая итерация состоит из группы строк, как получить эти строки, используя itr?

Тест 1:

inrow.length: 0
[]
inrow.length: 0
[]
2020-03-02 05:22:14,179 [Executor task launch worker for task 615] ERROR org.apache.spark.executor.Executor - Exception in task 110.0 in stage 21.0 (TID 615)
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
    at org.apache.spark.sql.Row$class.getString(Row.scala:255)
    at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:166)

Выход-1:

inrow.length: 0
[]
inrow.length: 0
[]
inrow.length: 1
[con_dist_1]
inrow.length: 1
[con_dist_2]
inrow.length: 1
[con_dist_5]
inrow.length: 1
[con_dist_6]
inrow.length: 1
[con_dist_4]
inrow.length: 1
[con_dist_3]

1 Ответ

2 голосов
/ 02 марта 2020

Все строки раздела находятся в itr. Поэтому, когда вы звоните itr.next(), вы получаете только первый ряд. Если вам нужно распечатать все строки, вы можете использовать while l oop, или вы можете преобразовать итератор в список с чем-то вроде этого (я подозреваю, что это то, что вы хотели получить):

partitioned_codes.foreachPartition(itr -> {
    Iterable<Row> rowIt = () -> itr;
    List<String> objs = StreamSupport.stream(rowIt.spliterator(), false)
            .map(row -> row.getString(0))
            .collect(Collectors.toList());

    System.out.println("inrow.length: " + objs.size());
    System.out.println(objs);
});

Пример кода, который вы разместили, не скомпилирован для меня, поэтому вот версия, с которой я тестировал:

List<String> data = Arrays.asList("con_dist_1", "con_dist_2", 
        "con_dist_3", "con_dist_4", "con_dist_5",
        "con_dist_6");
StructType struct = new StructType()
        .add(DataTypes.createStructField("codes", DataTypes.StringType, true));
Dataset<Row> codes = sparkSession.createDataFrame(sc.parallelize(data, 2)
                        .map(s -> RowFactory.create(s)), struct);
Dataset<Row> partitioned_codes = codes.repartition(org.apache.spark.sql.functions.col("codes"));

partitioned_codes.foreachPartition(itr -> {
    Iterable<Row> rowIt = () -> itr;
    List<String> objs = StreamSupport.stream(rowIt.spliterator(), false)
            .map(row -> row.getString(0))
            .collect(Collectors.toList());

    System.out.println("inrow.length: " + objs.size());
    System.out.println(objs);
});
...