Добавить столбец индекса в Apache Spark Dataset <Row>, используя Java - PullRequest
0 голосов
/ 16 мая 2019

Приведенный ниже вопрос имеет решение для scala и pyspark, и решение, представленное в этом вопросе, не для последовательных значений индекса.

Spark Dataframe: Как добавить индекс Столбец: Индекс распределенных данных Aka

У меня есть существующий набор данных в Apache-spark, и я хочу выбрать из него несколько строкна основе индекса.Я планирую добавить один столбец индекса, который содержит уникальные значения, начиная с 1, и на основе значений этого столбца я получу строки.Ниже я нашел способ добавить индекс, который использует порядок по:

df.withColumn("index", functions.row_number().over(Window.orderBy("a column")));

Я не хочу использовать порядок по.Мне нужен индекс в том же порядке, в котором они присутствуют в наборе данных.Любая помощь?

1 Ответ

0 голосов
/ 16 мая 2019

Из того, что я понял, вы пытаетесь добавить индекс (с последовательными значениями) в фрейм данных. К сожалению, в Spark нет встроенной функции, которая делает это. Вы можете добавить только увеличивающийся индекс (но не обязательно с последовательными значениями) с помощью df.withColumn ("index", monotonicallyIncreasingId).

Тем не менее, в RDD API существует функция zipWithIndex, которая делает именно то, что вам нужно. Таким образом, мы можем определить функцию, которая преобразует фрейм данных в RDD, добавляет индекс и преобразует его обратно в фрейм данных.

Я не эксперт в искре в Java (scala намного более компактен), так что, возможно, будет лучше. Вот как бы я это сделал.

public static Dataset<Row> zipWithIndex(Dataset<Row> df, String name) {
    JavaRDD<Row> rdd = df.javaRDD().zipWithIndex().map(t -> {
        Row r = t._1;
        Long index = t._2 + 1;
        ArrayList<Object> list = new ArrayList<>();
        r.toSeq().iterator().foreach(x -> list.add(x));
        list.add(index);
        return RowFactory.create(list);
    });
    StructType newSchema = df.schema()
            .add(new StructField(name, DataTypes.LongType, true, null));
    return df.sparkSession().createDataFrame(rdd, newSchema);
}

А вот как вы бы это использовали. Обратите внимание, что встроенная функция искры делает в отличие от того, что делает наш подход.

Dataset<Row> df = spark.range(5)
    .withColumn("index1", functions.monotonicallyIncreasingId());
Dataset<Row> result = zipWithIndex(df, "good_index");
// df
+---+-----------+
| id|     index1|
+---+-----------+
|  0|          0|
|  1| 8589934592|
|  2|17179869184|
|  3|25769803776|
|  4|25769803777|
+---+-----------+

// result
+---+-----------+----------+
| id|     index1|good_index|
+---+-----------+----------+
|  0|          0|         1|
|  1| 8589934592|         2|
|  2|17179869184|         3|
|  3|25769803776|         4|
|  4|25769803777|         5|
+---+-----------+----------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...