Добавить новую запись перед другой в Spark - PullRequest
0 голосов
/ 04 марта 2019

У меня есть датафрейм:

| ID | TIMESTAMP | VALUE |
  1     15:00:01    3
  1     17:04:02    2

Я хочу добавить новую запись с помощью Spark-Scala ранее с тем же временем минус 1 секунда, когда значение равно 2.

Выходбудет:

| ID | TIMESTAMP | VALUE |
  1     15:00:01    3
  1     17:04:01    2
  1     17:04:02    2

Спасибо

Ответы [ 2 ]

0 голосов
/ 04 марта 2019

Вам нужно .flatMap()

Аналогично карте, но каждый элемент ввода может быть сопоставлен с 0 или более элементами вывода (поэтому func должен возвращать Seq, а неодин элемент).

val data = (spark.createDataset(Seq(
    (1, "15:00:01", 3),
    (1, "17:04:02", 2)
  )).toDF("ID", "TIMESTAMP_STR", "VALUE")
  .withColumn("TIMESTAMP", $"TIMESTAMP_STR".cast("timestamp").as("TIMESTAMP"))
  .drop("TIMESTAMP_STR")
  .select("ID", "TIMESTAMP", "VALUE")
)

data.as[(Long, java.sql.Timestamp, Long)].flatMap(r => {
  if(r._3 == 2) {
    Seq(
      (r._1, new java.sql.Timestamp(r._2.getTime() - 1000L), r._3),
      (r._1, r._2, r._3)
    )
  } else {
    Some(r._1, r._2, r._3)
  }
}).toDF("ID", "TIMESTAMP", "VALUE").show()

В результате:

+---+-------------------+-----+
| ID|           TIMESTAMP|VALUE|
+---+-------------------+-----+
|  1|2019-03-04 15:00:01|    3|
|  1|2019-03-04 17:04:01|    2|
|  1|2019-03-04 17:04:02|    2|
+---+-------------------+-----+
0 голосов
/ 04 марта 2019

Вы можете ввести новый массив столбцов - когда значение = 2, затем Array (-1,0) или Array (0), затем взорвать этот столбец и добавить его с отметкой времени в секундах.Ниже следует работать для вас.Проверьте это:

scala> val df = Seq((1,"15:00:01",3),(1,"17:04:02",2)).toDF("id","timestamp","value")
df: org.apache.spark.sql.DataFrame = [id: int, timestamp: string ... 1 more field]

scala> val df2 = df.withColumn("timestamp",'timestamp.cast("timestamp"))
df2: org.apache.spark.sql.DataFrame = [id: int, timestamp: timestamp ... 1 more field]

scala> df2.show(false)
+---+-------------------+-----+
|id |timestamp          |value|
+---+-------------------+-----+
|1  |2019-03-04 15:00:01|3    |
|1  |2019-03-04 17:04:02|2    |
+---+-------------------+-----+


scala> val df3 = df2.withColumn("newc", when($"value"===lit(2),lit(Array(-1,0))).otherwise(lit(Array(0))))
df3: org.apache.spark.sql.DataFrame = [id: int, timestamp: timestamp ... 2 more fields]

scala> df3.show(false)
+---+-------------------+-----+-------+
|id |timestamp          |value|newc   |
+---+-------------------+-----+-------+
|1  |2019-03-04 15:00:01|3    |[0]    |
|1  |2019-03-04 17:04:02|2    |[-1, 0]|
+---+-------------------+-----+-------+


scala> val df4 = df3.withColumn("c_explode",explode('newc)).withColumn("timestamp2",to_timestamp(unix_timestamp('timestamp)+'c_explode))
df4: org.apache.spark.sql.DataFrame = [id: int, timestamp: timestamp ... 4 more fields]

scala> df4.select($"id",$"timestamp2",$"value").show(false)
+---+-------------------+-----+
|id |timestamp2         |value|
+---+-------------------+-----+
|1  |2019-03-04 15:00:01|3    |
|1  |2019-03-04 17:04:01|2    |
|1  |2019-03-04 17:04:02|2    |
+---+-------------------+-----+


scala>

Если вы хотите, чтобы часть времени оставалась одна, то вы можете сделать как

scala> df4.withColumn("timestamp",from_unixtime(unix_timestamp('timestamp2),"HH:mm:ss")).select($"id",$"timestamp",$"value").show(false)
+---+---------+-----+
|id |timestamp|value|
+---+---------+-----+
|1  |15:00:01 |3    |
|1  |17:04:01 |2    |
|1  |17:04:02 |2    |
+---+---------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...