Как сохранить порядок данных при выборе различных значений столбца из набора данных - PullRequest
0 голосов
/ 12 марта 2019

У меня есть набор данных, как показано ниже,

+------+------+---------------+
| col1 | col2 |  sum(costs)   |
+------+------+---------------+
|    1 | a    | 3555204326.27 |
|    4 | b    | 22273491.72   |
|    5 | c    | 219175.00     |
|    3 | a    | 219175.00     |
|    2 | c    | 75341433.37   |
+------+------+---------------+

Мне нужно выбрать различные значения col1, и мой результирующий набор данных должен иметь порядок 1, 4, 5, 3, 2 (порядокв котором эти значения доступны в исходном наборе данных).Но заказ становится перетасованным.Есть ли способ поддерживать тот же порядок, что и в наборе данных?Любое предложение в Spark / SQL может быть в порядке.

Этот набор данных может быть получен с помощью следующей последовательности в искре.

df = sqlCtx.createDataFrame(
  [(1, a, 355.27), (4, b, 222.98), (5, c, 275.00), (3, a, 25.00),
   (2, c, 753.37)], ('Col1', 'col2', 'cost'));

Ответы [ 4 ]

1 голос
/ 13 марта 2019

Предполагается, что вы пытаетесь удалить дубликаты в col2 (так как их нет в col1), так что конечный результат будет:

+----+----+---------------+
|col1|col2|            sum|
+----+----+---------------+
|   1|   a|3.55520432627E9|
|   4|   b|  2.227349172E7|
|   5|   c|       219175.0|
+----+----+---------------+

Вы можете добавить столбец индекса как:

df = df.withColumn("__idx", monotonically_increasing_id());

Затем выполните все необходимые преобразования, а затем отбросьте их, как в:

df = df.dropDuplicates("col2").orderBy("__idx").drop("__idx");

Это будет означать:

Шаг 1: загрузкаданные и прочее:

+----+----+---------------+
|col1|col2|            sum|
+----+----+---------------+
|   1|   a|3.55520432627E9|
|   4|   b|  2.227349172E7|
|   5|   c|       219175.0|
|   3|   a|       219175.0|
|   2|   c|  7.534143337E7|
+----+----+---------------+

Шаг 2: добавьте индекс:

+----+----+---------------+-----+
|col1|col2|            sum|__idx|
+----+----+---------------+-----+
|   1|   a|3.55520432627E9|    0|
|   4|   b|  2.227349172E7|    1|
|   5|   c|       219175.0|    2|
|   3|   a|       219175.0|    3|
|   2|   c|  7.534143337E7|    4|
+----+----+---------------+-----+

Шаг 3: преобразования (здесь удалите дуплики в col2) и удалите __idxстолбец:

+----+----+---------------+
|col1|col2|            sum|
+----+----+---------------+
|   1|   a|3.55520432627E9|
|   4|   b|  2.227349172E7|
|   5|   c|       219175.0|
+----+----+---------------+

Код Java может быть:

package net.jgp.books.spark.ch12.lab990_others;

import static org.apache.spark.sql.functions.monotonically_increasing_id;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Keeping the order of rows during transformations.
 * 
 * @author jgp
 */
public class KeepingOrderApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    KeepingOrderApp app = new KeepingOrderApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Splitting a dataframe to collect it")
        .master("local")
        .getOrCreate();

    Dataset<Row> df = createDataframe(spark);
    df.show();

    df = df.withColumn("__idx", monotonically_increasing_id());
    df.show();

    df = df.dropDuplicates("col2").orderBy("__idx").drop("__idx");
    df.show();
  }

  private static Dataset<Row> createDataframe(SparkSession spark) {
    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "col1",
            DataTypes.IntegerType,
            false),
        DataTypes.createStructField(
            "col2",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "sum",
            DataTypes.DoubleType,
            false) });

    List<Row> rows = new ArrayList<>();
    rows.add(RowFactory.create(1, "a", 3555204326.27));
    rows.add(RowFactory.create(4, "b", 22273491.72));
    rows.add(RowFactory.create(5, "c", 219175.0));
    rows.add(RowFactory.create(3, "a", 219175.0));
    rows.add(RowFactory.create(2, "c", 75341433.37));

    return spark.createDataFrame(rows, schema);
  }
}
1 голос
/ 13 марта 2019

Вы можете добавить еще один столбец, содержащий индекс каждой строки, а затем отсортировать по этому столбцу после «отличного». Вот пример:

import org.apache.spark.sql.functions._

val df = Seq(1, 4, 4, 5, 2)
  .toDF("a")
  .withColumn("id", monotonically_increasing_id())
df.show()
// +---+---+
// |  a| id|
// +---+---+
// |  1|  0|
// |  4|  1|
// |  4|  2|
// |  5|  3|
// |  2|  4|
// +---+---+

df.dropDuplicates("a").sort("id").show()
// +---+---+
// |  a| id|
// +---+---+
// |  1|  0|
// |  4|  1|
// |  5|  3|
// |  2|  4|
// +---+---+

Обратите внимание, что для различения в 1 конкретном столбце вы можете использовать dropDuplicates, если вы хотите контролировать, какую строку вы хотите взять в случае дублирования, используйте groupBy.

0 голосов
/ 12 марта 2019

Я полагаю, что вам нужно переформатировать запрос и использовать группу, вместо того, чтобы различать, как показано в этом ответе SQL: Как сохранить порядок строк с помощью DISTINCT?

0 голосов
/ 12 марта 2019

Вы можете добавить столбец индекса в вашу БД, а затем в своем запросе SQL сделать ORDER BY id

...