Как отфильтровать искровой фрейм данных на основе появления значения в столбце с условием столбец даты? - PullRequest
0 голосов
/ 22 ноября 2018
Команда

, я работаю с фреймом данных, выглядит следующим образом:

    df
    client   | date   
      C1     |08-NOV-18 11.29.43
      C2     |09-NOV-18 13.29.43
      C2     |09-NOV-18 18.29.43
      C3     |11-NOV-18 19.29.43
      C1     |12-NOV-18 10.29.43
      C2     |13-NOV-18 09.29.43
      C4     |14-NOV-18 20.29.43
      C1     |15-NOV-18 11.29.43
      C5     |16-NOV-18 15.29.43
      C10    |17-NOV-18 19.29.43
      C1     |18-NOV-18 12.29.43
      C2     |18-NOV-18 10.29.43
      C2     |19-NOV-18 09.29.43
      C6     |20-NOV-18 13.29.43
      C6     |21-NOV-18 14.29.43
      C1     |21-NOV-18 18.29.43
      C1     |22-NOV-18 11.29.43

Моя цель - отфильтровать этот фрейм данных и получить новый фрейм данных, содержащий последние два вхождения каждого клиента, если это вхождение <из 24например, для этого примера результат должен быть: </p>

     client  |date
      C2     |18-NOV-18 10.29.43
      C2     |19-NOV-18 09.29.43
      C1     |21-NOV-18 18.29.43
      C1     |22-NOV-18 11.29.43

любая помощь, пожалуйста!

Ответы [ 3 ]

0 голосов
/ 22 ноября 2018

Использование оконных функций.Проверьте это:

val df = Seq(("C1","08-NOV-18 11.29.43"),
  ("C2","09-NOV-18 13.29.43"),
  ("C2","09-NOV-18 18.29.43"),
  ("C3","11-NOV-18 19.29.43"),
  ("C1","12-NOV-18 10.29.43"),
  ("C2","13-NOV-18 09.29.43"),
  ("C4","14-NOV-18 20.29.43"),
  ("C1","15-NOV-18 11.29.43"),
  ("C5","16-NOV-18 15.29.43"),
  ("C10","17-NOV-18 19.29.43"),
  ("C1","18-NOV-18 12.29.43"),
  ("C2","18-NOV-18 10.29.43"),
  ("C2","19-NOV-18 09.29.43"),
  ("C6","20-NOV-18 13.29.43"),
  ("C6","21-NOV-18 14.29.43"),
  ("C1","21-NOV-18 18.29.43"),
  ("C1","22-NOV-18 11.29.43")).toDF("client","dt").withColumn("dt",from_unixtime(unix_timestamp('dt,"dd-MMM-yy HH.mm.ss"),"yyyy-MM-dd HH:mm:ss"))

df.createOrReplaceTempView("tbl")

val df2 = spark.sql(""" select * from ( select client, dt, count(*) over(partition by client ) cnt, rank() over(partition by client order by dt desc) rk1  from tbl ) t where cnt>1 and rk1 in (1,2) """)

df2.alias("t1").join(df2.alias("t2"), $"t1.client" === $"t2.client" and $"t1.rk1" =!= $"t2.rk1" , "inner" ).withColumn("dt24",(unix_timestamp($"t1.dt") - unix_timestamp($"t2.dt") )/ 3600 ).where("dt24 > -24 and dt24 < 24").select($"t1.client", $"t1.dt").show(false)

Результаты:

+------+-------------------+
|client|dt                 |
+------+-------------------+
|C1    |2018-11-22 11:29:43|
|C1    |2018-11-21 18:29:43|
|C2    |2018-11-19 09:29:43|
|C2    |2018-11-18 10:29:43|
+------+-------------------+
0 голосов
/ 22 ноября 2018

С помощью функции Window можно найти следующие / предыдущие даты, а затем отфильтрованные строки, в которых разница между датами превышает 24 часа.

Подготовка данных

val df = Seq(("C1", "08-NOV-18 11.29.43"),
  ("C2", "09-NOV-18 13.29.43"),
  ("C2", "09-NOV-18 18.29.43"),
  ("C3", "11-NOV-18 19.29.43"),
  ("C1", "12-NOV-18 10.29.43"),
  ("C2", "13-NOV-18 09.29.43"),
  ("C4", "14-NOV-18 20.29.43"),
  ("C1", "15-NOV-18 11.29.43"),
  ("C5", "16-NOV-18 15.29.43"),
  ("C10", "17-NOV-18 19.29.43"),
  ("C1", "18-NOV-18 12.29.43"),
  ("C2", "18-NOV-18 10.29.43"),
  ("C2", "19-NOV-18 09.29.43"),
  ("C6", "20-NOV-18 13.29.43"),
  ("C6", "21-NOV-18 14.29.43"),
  ("C1", "21-NOV-18 18.29.43"),
  ("C1", "22-NOV-18 11.29.43"))
  .toDF("client", "dt")
  .withColumn("dt", to_timestamp($"dt", "dd-MMM-yy HH.mm.ss"))

Действующий код

// get next/prev dates
val dateWindow = Window.partitionBy("client").orderBy("dt")
val withNextPrevDates = df
  .withColumn("previousDate", lag($"dt", 1).over(dateWindow))
  .withColumn("nextDate", lead($"dt", 1).over(dateWindow))

// function for filter
val secondsInDay = TimeUnit.DAYS.toSeconds(1)
val dateDiffLessThanDay = (startTimeStamp: Column, endTimeStamp: Column) =>
  endTimeStamp.cast(LongType) - startTimeStamp.cast(LongType) < secondsInDay && datediff(endTimeStamp, startTimeStamp) === 1

// filter
val result = withNextPrevDates
  .where(dateDiffLessThanDay($"previousDate", $"dt") || dateDiffLessThanDay($"dt", $"nextDate"))
  .drop("previousDate", "nextDate")

Результат

+------+-------------------+
|client|dt                 |
+------+-------------------+
|C1    |2018-11-21 18:29:43|
|C1    |2018-11-22 11:29:43|
|C2    |2018-11-18 10:29:43|
|C2    |2018-11-19 09:29:43|
+------+-------------------+
0 голосов
/ 22 ноября 2018

У меня есть одно решение для этого сценария:

  val milliSecForADay = 24 * 60 * 60 * 1000

  val filterDatesUDF = udf { arr: scala.collection.mutable.WrappedArray[Timestamp] =>
    arr.sortWith(_ after _).toList match {
      case last :: secondLast :: _ if (last.getTime - secondLast.getTime) < milliSecForADay => Array(secondLast, last)
      case _ => Array.empty[Timestamp]
    }
  }

  val finalDF = df.groupBy("client")
    .agg(collect_list("date").as("dates"))
    .select(col("client"), explode(filterDatesUDF(col("dates"))).as("date"))
    .show()

В этом решении, во-первых, я группирую данные на основе клиента, используя user-defined function или udf для обработки меток времени, сгруппированных для каждогоclient.

Это делается, предполагая, что столбец date уже в формате Timestamp, что, я думаю, может быть неверным.Если вы получаете столбец date типа String, добавьте следующий код перед приведенным выше решением, чтобы преобразовать тип столбца date из String в Timestamp.

  val stringToTimestampUDF = udf { str: String =>
    val format = new java.text.SimpleDateFormat("dd-MMM-yy hh.mm.ss") //format for "08-NOV-18 11.29.43"
    new Timestamp(format.parse(str).getTime)
  }

  val df = originDF.select(col("client"), to_utc_timestamp(stringToTimestampUDF(col("date")), "utc").as("date"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...