Как расширить временной диапазон в поминутные интервалы в Spark (Scala или Python)? - PullRequest
0 голосов
/ 06 февраля 2019

У меня есть набор данных, имеющий следующую структуру.

+-------+----------+---------------+---------------+
| tv_id | movie_id |  start_time   |   end_time    |
+-------+----------+---------------+---------------+
| tv123 | movie123 | 02/05/19 3:05 | 02/05/19 3:08 |
| tv234 | movie345 | 02/05/19 3:07 | 02/05/19 3:10 |
+-------+----------+---------------+---------------+

Вывод, который я пытаюсь получить, выглядит следующим образом:

+-------+----------+---------------+
| tv_id | movie_id |    minute     |
+-------+----------+---------------+
| tv123 | movie123 | 02/05/19 3:05 |
| tv123 | movie123 | 02/05/19 3:06 |
| tv123 | movie123 | 02/05/19 3:07 |
| tv234 | movie345 | 02/05/19 3:07 |
| tv234 | movie345 | 02/05/19 3:08 |
| tv234 | movie345 | 02/05/19 3:09 |
+-------+----------+---------------+

Подробное объяснение: для tv_id: tv123общее время просмотра составляет 3 минуты (3:08 - 3: 05), то же самое относится и к другим записям.

Я пытаюсь использовать либо python / Scala /, либо SQL для получения результата.[Нет ограничений на используемый язык] Мой код Python:

df = read_csv('data')
df[minutes_diff] = df['end_time'] - df['start_time']

for i in range(df['minutes_diff']):
    finaldf = df[tv_id] + df[movie_id] + df['start_time'] + df[minutes_diff] + "i"

Я не уверен, как я могу это сделать.Я не очень хорошо разбираюсь в Scala flatmap.Некоторые исследования StackOverflow указали на использование flatmap, но я не уверен, как я могу использовать diff в flatmap вместо агрегации.

Примечание: Я не хочу открывать отдельный поток для SQL и PythonСледовательно, объединяя все это в одном и том же вопросе.Даже решение sql будет очень хорошо для меня.

1 Ответ

0 голосов
/ 06 февраля 2019

Вот решение на основе Scala, использующее UDF, расширяющее временной диапазон с помощью API java.time в список per-minute, который затем сглаживается встроенным в Spark методом explode:

import org.apache.spark.sql.functions._

val df = Seq(
  ("tv123", "movie123", "02/05/19 3:05", "02/05/19 3:08"),
  ("tv234", "movie345", "02/05/19 3:07", "02/05/19 3:10")
).toDF("tv_id", "movie_id", "start_time", "end_time")

def minuteList(timePattern: String) = udf{ (timeS1: String, timeS2: String) =>
  import java.time.LocalDateTime
  import java.time.format.DateTimeFormatter

  val timeFormat = DateTimeFormatter.ofPattern(timePattern)
  val t1 = LocalDateTime.parse(timeS1, timeFormat)
  val t2 = LocalDateTime.parse(timeS2, timeFormat)

  Iterator.iterate(t1)(_.plusMinutes(1)).takeWhile(_ isBefore t2).
    map(_.format(timeFormat)).
    toList
}

df.
  withColumn("minute_list", minuteList("MM/dd/yy H:mm")($"start_time", $"end_time")).
  withColumn("minute", explode($"minute_list")).
  select("tv_id", "movie_id", "minute").
  show(false)
// +-----+--------+-------------+
// |tv_id|movie_id|minute       |
// +-----+--------+-------------+
// |tv123|movie123|02/05/19 3:05|
// |tv123|movie123|02/05/19 3:06|
// |tv123|movie123|02/05/19 3:07|
// |tv234|movie345|02/05/19 3:07|
// |tv234|movie345|02/05/19 3:08|
// |tv234|movie345|02/05/19 3:09|
// +-----+--------+-------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...