Фильтрация DataFrame по сравнению столбцов даты - PullRequest
0 голосов
/ 15 января 2019

Я пытаюсь отфильтровать DataFrame, сравнивая два столбца даты, используя Scala и Spark. На основе отфильтрованного DataFrame сверху выполняются вычисления для вычисления новых столбцов. Упрощенный мой фрейм данных имеет следующую схему:

|-- received_day: date (nullable = true)
|-- finished: int (nullable = true)

Кроме того, я создаю два новых столбца t_start и t_end, которые будут использоваться для фильтрации DataFrame. Они имеют разницу в 10 и 20 дней от исходного столбца received_day:

val dfWithDates= df
      .withColumn("t_end",date_sub(col("received_day"),10))
      .withColumn("t_start",date_sub(col("received_day"),20))

Теперь я хочу иметь новый вычисляемый столбец, который указывает для каждой строки данных, сколько строк в кадре данных в периоде от t_start до t_end. Я думал, что смогу добиться этого следующим образом:

val dfWithCount = dfWithDates
       .withColumn("cnt", lit(
        dfWithDates.filter(
          $"received_day".lt(col("t_end")) 
          && $"received_day".gt(col("t_start"))).count()))

Однако этот счет возвращает только 0, и я считаю, что проблема заключается в аргументе, который я передаю lt и gt.

Из этой проблемы здесь Фильтрация кадра данных искры по дате Я понял, что мне нужно передать строковое значение. Если я пытаюсь использовать жестко закодированные значения, такие как lt(lit("2018-12-15")), то фильтрация работает. Поэтому я попытался привести мои столбцы к StringType:

val dfWithDates= df
      .withColumn("t_end",date_sub(col("received_day"),10).cast(DataTypes.StringType))
      .withColumn("t_start",date_sub(col("received_day"),20).cast(DataTypes.StringType))

Но фильтр все еще возвращает пустой dataFrame. Я бы предположил, что я не правильно обрабатываю тип данных.

Я работаю на Scala 2.11.0 с Spark 2.0.2.

Ответы [ 2 ]

0 голосов
/ 15 января 2019

Да, вы правы. Для $"received_day".lt(col("t_end") каждое значение reveived_day сравнивается со значением t_end текущей строки, а не со всем фреймом данных. Так что каждый раз вы получите ноль в качестве счета. Вы можете решить эту проблему, написав простой UDF. Вот как вы можете решить проблему:

Создание образца входного набора данных :

import org.apache.spark.sql.{Row, SparkSession}
import java.sql.Date
import org.apache.spark.sql.functions._
import spark.implicits._
val df = Seq((Date.valueOf("2018-10-12"),1),
              (Date.valueOf("2018-10-13"),1),
              (Date.valueOf("2018-09-25"),1),
              (Date.valueOf("2018-10-14"),1)).toDF("received_day", "finished")

val dfWithDates= df
  .withColumn("t_start",date_sub(col("received_day"),20))
  .withColumn("t_end",date_sub(col("received_day"),10))
dfWithDates.show()
    +------------+--------+----------+----------+
|received_day|finished|   t_start|     t_end|
+------------+--------+----------+----------+
|  2018-10-12|       1|2018-09-22|2018-10-02|
|  2018-10-13|       1|2018-09-23|2018-10-03|
|  2018-09-25|       1|2018-09-05|2018-09-15|
|  2018-10-14|       1|2018-09-24|2018-10-04|
+------------+--------+----------+----------+

Здесь для 2018-09-25 мы хотим считать 3

Генерация вывода :

val count_udf = udf((received_day:Date) => {
        (dfWithDates.filter((col("t_end").gt(s"$received_day")) && col("t_start").lt(s"$received_day")).count())
    })
    val dfWithCount = dfWithDates.withColumn("count",count_udf(col("received_day")))
    dfWithCount.show()
    +------------+--------+----------+----------+-----+
|received_day|finished|   t_start|     t_end|count|
+------------+--------+----------+----------+-----+
|  2018-10-12|       1|2018-09-22|2018-10-02|    0|
|  2018-10-13|       1|2018-09-23|2018-10-03|    0|
|  2018-09-25|       1|2018-09-05|2018-09-15|    3|
|  2018-10-14|       1|2018-09-24|2018-10-04|    0|
+------------+--------+----------+----------+-----+

Чтобы ускорить вычисления, я бы предложил кэшировать dfWithDates, поскольку для каждой строки повторяется одна и та же операция.

0 голосов
/ 15 января 2019

Вы можете преобразовать значение даты в строку с любым шаблоном, используя DateTimeFormatter

import java.time.format.DateTimeFormatter

date.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...