Scala: Как сгруппировать по временной метке Iterable [T] в Iterable [T] - PullRequest
0 голосов
/ 02 июня 2018

Я хотел бы написать код, который бы группировал входные данные итератора строки: Iterator[InputRow] по отметке времени уникальных элементов (по unit и eventName), т. Е. eventTime должна быть самой последней отметкой времени в новой Iterator[T] список, где InputRow определяется как

case class InputRow(unit:Int, eventName: String, eventTime:java.sql.Timestamp, value: Int)

Пример данных до группировки:

+-----------------------+----+---------+-----+
|eventTime              |unit|eventName|value|
+-----------------------+----+---------+-----+
|2018-06-02 16:05:11    |2   |B        |1    |
|2018-06-02 16:05:12    |1   |A        |2    |
|2018-06-02 16:05:13    |2   |A        |2    |
|2018-06-02 16:05:14    |1   |A        |3    |
|2018-06-02 16:05:15    |2   |A        |3    |

После:

+-----------------------+----+---------+-----+
|eventTime              |unit|eventName|value|
+-----------------------+----+---------+-----+
|2018-06-02 16:05:11    |2   |B        |1    |
|2018-06-02 16:05:14    |1   |A        |3    |
|2018-06-02 16:05:15    |2   |A        |3    |

Что такое хороший подход к написаниюприведенный выше код в Scala?

Ответы [ 3 ]

0 голосов
/ 02 июня 2018

Это можно сделать с помощью foldLeft и map:

val grouped: Map[(Int, String), InputRow] = 
  rows
    .foldLeft(Map.empty[(Int, String), Seq[InputRow]])({ case (acc, row) =>
     val key = (row.unit, row.eventName)
     // Get from the accumulator the Seq that already exists or Nil if
     // this key has never been seen before
     val value = acc.getOrElse(key, Nil)
     // Update the accumulator
     acc + (key -> (value :+ row))
  })
  // Get the last element from the list of rows when grouped by unit and event.
  .map({case (k, v) => k -> v.last})

Это предполагает, что eventTime уже сохранены в отсортированном порядке.Если это неверное предположение, вы можете определить implicit Ordering для java.sql.Timestamp и заменить v.last на v.maxBy(_.eventTime).

См. здесь .

Редактировать

Или использовать .groupBy(row => (row.unit, row.eventName)) вместо foldLeft:

implicit val ordering: Ordering[Timestamp] = _ compareTo _
val grouped = rows.groupBy(row => (row.unit, row.eventName))
                  .values
                  .map(_.maxBy(_.eventTime))
0 голосов
/ 02 июня 2018

Вы можете использовать struct встроенную функцию для объединения eventTime и value столбца как struct, так что max на eventTime (самое последнее) может быть взято, когда groupBy unit и eventName и агрегирование , что должно дать вам желаемый результат

import org.apache.spark.sql.functions._
df.withColumn("struct", struct("eventTime", "value"))
    .groupBy("unit", "eventName")
    .agg(max("struct").as("struct"))
    .select(col("struct.eventTime"), col("unit"), col("eventName"), col("struct.value"))

как

+-------------------+----+---------+-----+
|eventTime          |unit|eventName|value|
+-------------------+----+---------+-----+
|2018-06-02 16:05:14|1   |A        |3    |
|2018-06-02 16:05:11|2   |B        |1    |
|2018-06-02 16:05:15|2   |A        |3    |
+-------------------+----+---------+-----+
0 голосов
/ 02 июня 2018

Хорошие новости: ваш вопрос уже содержит глаголы, которые соответствуют функциональным вызовам, которые будут использоваться в коде: группировать по, сортировать по (последней отметке времени).

сортировать InputRow по последней отметке времени,нам понадобится неявное упорядочение:

implicit val rowSortByTimestamp: Ordering[InputRow] = 
    (r1: InputRow, r2: InputRow) => r1.eventTime.compareTo(r2.eventTime)
// or shorter:
// implicit val rowSortByTimestamp: Ordering[InputRow] = 
//   _.eventTime compareTo _.eventTime

А теперь, имея

val input: Iterator[InputRow] = // input data

Давайте сгруппируем их по (unit, eventName)

val result = input.toSeq.groupBy(row => (row.unit, row.eventName))

, затем извлечемтот, который имеет самую последнюю метку времени

  .map { case (gr, rows) => rows.sorted.last }

и сортируется от самого раннего до самого последнего

  .toSeq.sorted

Результат -

InputRow(2,B,2018-06-02 16:05:11.0,1)
InputRow(1,A,2018-06-02 16:05:14.0,3)
InputRow(2,A,2018-06-02 16:05:15.0,3)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...