Группировка нескольких столбцов без агрегации - PullRequest
0 голосов
/ 01 марта 2019

У меня есть фрейм данных (Dataset<Row>), в котором шесть столбцов, из шести необходимо сгруппировать четыре, а для двух других столбцов он может повторять сгруппированные столбцы n раз в зависимости от изменяющегося значения в этих двухстолбцы.

требуемый набор данных, как показано ниже:

 id | batch  | batch_Id | session_name | time          | value
 001|  abc   |   098    |    course-I  | 1551409926133 |  2.3
 001|  abc   |   098    |    course-I  | 1551404747843 |  7.3
 001|  abc   |   098    |    course-I  | 1551409934220 |  6.3

Я устал что-то вроде ниже

Dataset<Row> df2 = df.select("*")
    .groupBy(col("id"), col("batch_Id"), col("session_name"))
    .agg(max("time"));

Я добавил agg, чтобы получить групповой вывод, но не знаюкак этого добиться.

Помощь высоко ценится ... Спасибо.

1 Ответ

0 голосов
/ 01 марта 2019

Я не думаю, что вы были слишком далеко.

Учитывая ваш первый набор данных:

+---+-----+--------+------------+-------------+-----+
| id|batch|batch_Id|session_name|         time|value|
+---+-----+--------+------------+-------------+-----+
|001|  abc|     098|    course-I|1551409926133|  2.3|
|001|  abc|     098|    course-I|1551404747843|  7.3|
|001|  abc|     098|    course-I|1551409934220|  6.3|
|002|  def|     097|   course-II|1551409926453|  2.3|
|002|  def|     097|   course-II|1551404747843|  7.3|
|002|  def|     097|   course-II|1551409934220|  6.3|
+---+-----+--------+------------+-------------+-----+

И предполагая, что ваш желаемый результат:

+---+--------+------------+-------------+
| id|batch_Id|session_name|    max(time)|
+---+--------+------------+-------------+
|002|     097|   course-II|1551409934220|
|001|     098|    course-I|1551409934220|
+---+--------+------------+-------------+

Я бы написал следующий код для агрегации:

Dataset<Row> maxValuesDf = rawDf.select("*")
    .groupBy(col("id"), col("batch_id"), col("session_name"))
    .agg(max("time"));

И все приложение будет выглядеть так:

package net.jgp.books.spark.ch13.lab900_max_value;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.max;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class MaxValueAggregationApp {
  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    MaxValueAggregationApp app = new MaxValueAggregationApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Aggregates max values")
        .master("local[*]")
        .getOrCreate();

    // Reads a CSV file with header, called books.csv, stores it in a
    // dataframe
    Dataset<Row> rawDf = spark.read().format("csv")
        .option("header", true)
        .option("sep", "|")
        .load("data/misc/courses.csv");

    // Shows at most 20 rows from the dataframe
    rawDf.show(20);

    // Performs the aggregation, grouping on columns id, batch_id, and
    // session_name
    Dataset<Row> maxValuesDf = rawDf.select("*")
        .groupBy(col("id"), col("batch_id"), col("session_name"))
        .agg(max("time"));
    maxValuesDf.show(5);
  }
}

Помогает ли это?

...