Сравнение дат в Java Spark Dataframe - PullRequest
0 голосов
/ 04 августа 2020

У меня есть приведенный ниже фрейм данных / набор данных Spark. Столбец_2 имеет даты в строковом формате.

Column_1 Column_2
A        2020-08-05
B        2020-08-01
B        2020-09-20
B        2020-12-31
C        2020-05-10

Мой ожидаемый выходной фрейм данных должен иметь только одну строку для каждого значения в столбце_1, и если в столбце_2 есть несколько дат для того же ключа в столбце_1, тогда следующая доступная дата должна быть выбрал. если есть только одна строка, то дату следует сохранить.

Ожидаемый результат:

Column_1 Column_2
A        2020-08-05
B        2020-09-20
C        2020-05-10

Есть ли способ добиться этой Java искры? возможно без использования UDF?

Ответы [ 3 ]

1 голос
/ 04 августа 2020

Возможно, это поможет -

   dataset.show(false);
        dataset.printSchema();
        /**
         *+--------+----------+
         * |Column_1|Column_2  |
         * +--------+----------+
         * |A       |2020-08-05|
         * |D       |2020-08-01|
         * |D       |2020-08-02|
         * |B       |2020-08-01|
         * |B       |2020-09-20|
         * |B       |2020-12-31|
         * |C       |2020-05-10|
         * +--------+----------+
         *
         * root
         *  |-- Column_1: string (nullable = true)
         *  |-- Column_2: string (nullable = true)
         */

        dataset.withColumn("Column_2", to_date(col("Column_2")))
                .withColumn("count", count("Column_2").over(Window.partitionBy("Column_1")))
                .withColumn("positive", when(col("count").gt(1),
                        when(col("Column_2").gt(current_date()), col("Column_2"))
                ).otherwise(col("Column_2")))
                .withColumn("negative", when(col("count").gt(1),
                        when(col("Column_2").lt(current_date()), col("Column_2"))
                ).otherwise(col("Column_2")))
                .groupBy("Column_1")
                .agg(min("positive").as("positive"), max("negative").as("negative"))
                .selectExpr("Column_1", "coalesce(positive, negative) as Column_2")
                .show(false);
        /**
         * +--------+----------+
         * |Column_1|Column_2  |
         * +--------+----------+
         * |A       |2020-08-05|
         * |D       |2020-08-02|
         * |B       |2020-09-20|
         * |C       |2020-05-10|
         * +--------+----------+
         */
0 голосов
/ 04 августа 2020

SCALA: Это даст результат.

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("Column_1")

df.withColumn("count", count("Column_2").over(w))
  .withColumn("later", expr("IF(Column_2 > date(current_timestamp), True, False)"))
  .filter("count = 1 or (count != 1 and later = True)")
  .groupBy("Column_1")
  .agg(min("Column_2").alias("Column_2"))
  .orderBy("Column_1")
  .show(false)

+--------+----------+
|Column_1|Column_2  |
+--------+----------+
|A       |2020-08-05|
|B       |2020-09-20|
|C       |2020-05-10|
+--------+----------+

Он имеет исключение, если количество дат для Column_1 больше, чем 1 и нет даты после current_timestamp, это не даст результата для значения Column_1.

0 голосов
/ 04 августа 2020

Сначала создайте DataFrame

df_b = spark.createDataFrame([("A","2020-08-05"),("B","2020-08-01"),("B","2020-09-20"),("B","2020-12-31"),("C","2020-05-10")],[ "col1","col2"])
_w = W.partitionBy("col1").orderBy("col1")
df_b = df_b.withColumn("rn", F.row_number().over(_w))

Здесь c logi, чтобы выбрать второй элемент каждой группы, если какая-либо группа имеет более одной строки. Для этого мы можем сначала присвоить номер строки каждой группе, и мы выберем первый элемент каждой группы, где количество строк равно 1, и первые 2 строки каждой группы, где количество строк больше 1 в каждой группе.

case = F.expr("""
            CASE WHEN rn =1 THEN 1
                    WHEN rn =2 THEN 1
              END""")

df_b = df_b.withColumn('case_condition', case)
df_b = df_b.filter(F.col("case_condition") == F.lit("1")) 

Промежуточный вывод

+----+----------+---+--------------+
|col1|      col2| rn|case_condition|
+----+----------+---+--------------+
|   B|2020-08-01|  1|             1|
|   B|2020-09-20|  2|             1|
|   C|2020-05-10|  1|             1|
|   A|2020-08-05|  1|             1|
+----+----------+---+--------------+

Теперь, наконец, просто возьмите последний элемент каждой группы -

df = df_b.groupBy("col1").agg(F.last("col2").alias("col2")).orderBy("col1")
df.show()
+----+----------+
|col1|      col2|
+----+----------+
|   A|2020-08-05|
|   B|2020-09-20|
|   C|2020-05-10|
+----+----------+
...