Как интерполировать столбец внутри сгруппированного объекта в PySpark? - PullRequest
0 голосов
/ 10 февраля 2019

Как вы интерполируете фрейм данных PySpark в сгруппированные данные?

Например:

У меня есть фрейм данных PySpark со следующими столбцами:

+--------+-------------------+--------+
|webID   |timestamp          |counts  |
+--------+-------------------+--------+
|John    |2018-02-01 03:00:00|60      |
|John    |2018-02-01 03:03:00|66      |
|John    |2018-02-01 03:05:00|70      |
|John    |2018-02-01 03:08:00|76      |
|Mo      |2017-06-04 01:05:00|10      |
|Mo      |2017-06-04 01:07:00|20      |
|Mo      |2017-06-04 01:10:00|35      |
|Mo      |2017-06-04 01:11:00|40      |
+--------+----------------- -+--------+

Мне нужноинтерполировать данные счета Джона и Мо в точку данных каждую минуту в пределах их собственного интервала.Я открыт для любой простой линейной интерполяции - но учтите, что мои реальные данные каждые несколько секунд, и я хочу интерполировать каждую секунду.

Таким образом, результат должен быть:

+--------+-------------------+--------+
|webID   |timestamp          |counts  |
+--------+-------------------+--------+
|John    |2018-02-01 03:00:00|60      |
|John    |2018-02-01 03:01:00|62      |
|John    |2018-02-01 03:02:00|64      |
|John    |2018-02-01 03:03:00|66      |
|John    |2018-02-01 03:04:00|68      |
|John    |2018-02-01 03:05:00|70      |
|John    |2018-02-01 03:06:00|72      |
|John    |2018-02-01 03:07:00|74      |
|John    |2018-02-01 03:08:00|76      |
|Mo      |2017-06-04 01:05:00|10      |
|Mo      |2017-06-04 01:06:00|15      |
|Mo      |2017-06-04 01:07:00|20      |
|Mo      |2017-06-04 01:08:00|25      |
|Mo      |2017-06-04 01:09:00|30      |
|Mo      |2017-06-04 01:10:00|35      |
|Mo      |2017-06-04 01:11:00|40      |
+--------+----------------- -+--------+

Новыйстроки должны быть добавлены к моему исходному фрейму данных.Ищете решение PySpark.

Ответы [ 2 ]

0 голосов
/ 10 февраля 2019

Если вы используете Python, самый короткий способ добиться цели - это повторно использовать существующие функции Pandas, используя GROUPED_MAP udf:

from operator import attrgetter
from pyspark.sql.types import StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType

def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
    @pandas_udf(
        StructType(sorted(schema, key=attrgetter("name"))), 
        PandasUDFType.GROUPED_MAP)
    def _(pdf):
        pdf.set_index(timestamp_col, inplace=True)
        pdf = pdf.resample(freq).interpolate()
        pdf.ffill(inplace=True)
        pdf.reset_index(drop=False, inplace=True)
        pdf.sort_index(axis=1, inplace=True)
        return pdf
    return _

, примененные к вашим данным:

from pyspark.sql.functions import to_timestamp

df = spark.createDataFrame([
    ("John",   "2018-02-01 03:00:00", 60),  
    ("John",   "2018-02-01 03:03:00", 66),  
    ("John",   "2018-02-01 03:05:00", 70),  
    ("John",   "2018-02-01 03:08:00", 76),  
    ("Mo",     "2017-06-04 01:05:00", 10),  
    ("Mo",     "2017-06-04 01:07:00", 20),  
    ("Mo",     "2017-06-04 01:10:00", 35),  
    ("Mo",     "2017-06-04 01:11:00", 40),
], ("webID", "timestamp", "counts")).withColumn(
  "timestamp", to_timestamp("timestamp")
)

df.groupBy("webID").apply(resample(df.schema, "60S")).show()

это дает

+------+-------------------+-----+
|counts|          timestamp|webID|
+------+-------------------+-----+
|    60|2018-02-01 03:00:00| John|
|    62|2018-02-01 03:01:00| John|
|    64|2018-02-01 03:02:00| John|
|    66|2018-02-01 03:03:00| John|
|    68|2018-02-01 03:04:00| John|
|    70|2018-02-01 03:05:00| John|
|    72|2018-02-01 03:06:00| John|
|    74|2018-02-01 03:07:00| John|
|    76|2018-02-01 03:08:00| John|
|    10|2017-06-04 01:05:00|   Mo|
|    15|2017-06-04 01:06:00|   Mo|
|    20|2017-06-04 01:07:00|   Mo|
|    25|2017-06-04 01:08:00|   Mo|
|    30|2017-06-04 01:09:00|   Mo|
|    35|2017-06-04 01:10:00|   Mo|
|    40|2017-06-04 01:11:00|   Mo|
+------+-------------------+-----+

Это работает в предположении, что и входные, и интерполированные данные для одного webID могут поместиться в памяти одного узла (как правило, другие точные, не итерационные решения будутдолжны сделать аналогичные предположения).Если это не так, вы можете легко приблизиться, взяв перекрывающиеся окна

partial = (df
    .groupBy("webID", window("timestamp", "5 minutes", "3 minutes")["start"])
    .apply(resample(df.schema, "60S")))

и агрегировав окончательный результат

from pyspark.sql.functions import mean

(partial
    .groupBy("webID", "timestamp")
    .agg(mean("counts")
    .alias("counts"))
    # Order by key and timestamp, only for consistent presentation
    .orderBy("webId", "timestamp")
    .show())

Это, конечно, намного дороже (есть два шаффла инекоторые значения будут вычисляться несколько раз), но также могут оставлять пропуски, если перекрытие недостаточно велико для включения следующего наблюдения.

+-----+-------------------+------+
|webID|          timestamp|counts|
+-----+-------------------+------+
| John|2018-02-01 03:00:00|  60.0|
| John|2018-02-01 03:01:00|  62.0|
| John|2018-02-01 03:02:00|  64.0|
| John|2018-02-01 03:03:00|  66.0|
| John|2018-02-01 03:04:00|  68.0|
| John|2018-02-01 03:05:00|  70.0|
| John|2018-02-01 03:08:00|  76.0|
|   Mo|2017-06-04 01:05:00|  10.0|
|   Mo|2017-06-04 01:06:00|  15.0|
|   Mo|2017-06-04 01:07:00|  20.0|
|   Mo|2017-06-04 01:08:00|  25.0|
|   Mo|2017-06-04 01:09:00|  30.0|
|   Mo|2017-06-04 01:10:00|  35.0|
|   Mo|2017-06-04 01:11:00|  40.0|
+-----+-------------------+------+
0 голосов
/ 10 февраля 2019

Это не решение Python, но я полагаю, что решение Scala ниже может быть реализовано с использованием аналогичного подхода в Python.Он включает использование функции lag Window для создания временного диапазона в каждой строке и UDF, который расширяет временной диапазон с помощью API java.time в список временных рядов per-minute и интерполированных отсчетов, которые затем сглаживаются с помощью Spark.explode метод:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val df = Seq(
  ("John", "2018-02-01 03:00:00", 60),
  ("John", "2018-02-01 03:03:00", 66),
  ("John", "2018-02-01 03:05:00", 70),
  ("Mo", "2017-06-04 01:07:00", 20),
  ("Mo", "2017-06-04 01:10:00", 35),
  ("Mo", "2017-06-04 01:11:00", 40)
).toDF("webID", "timestamp", "count")

val winSpec = Window.partitionBy($"webID").orderBy($"timestamp")

def minuteList(timePattern: String) = udf{ (ts1: String, ts2: String, c1: Int, c2: Int) =>
  import java.time.LocalDateTime
  import java.time.format.DateTimeFormatter

  val timeFormat = DateTimeFormatter.ofPattern(timePattern)

  val perMinTS = if (ts1 == ts2) Vector(ts1) else {
      val t1 = LocalDateTime.parse(ts1, timeFormat)
      val t2 = LocalDateTime.parse(ts2, timeFormat)
      Iterator.iterate(t1.plusMinutes(1))(_.plusMinutes(1)).takeWhile(! _.isAfter(t2)).
        map(_.format(timeFormat)).
        toVector
    }

  val sz = perMinTS.size

  val perMinCount = for { i <- 1 to sz } yield c1 + ((c2 - c1) * i / sz)

  perMinTS zip perMinCount
}

df.
  withColumn("timestampPrev", when(row_number.over(winSpec) === 1, $"timestamp").
    otherwise(lag($"timestamp", 1).over(winSpec))).
  withColumn("countPrev", when(row_number.over(winSpec) === 1, $"count").
    otherwise(lag($"count", 1).over(winSpec))).
  withColumn("minuteList",
    minuteList("yyyy-MM-dd HH:mm:ss")($"timestampPrev", $"timestamp", $"countPrev", $"count")).
  withColumn("minute", explode($"minuteList")).
  select($"webID", $"minute._1".as("timestamp"), $"minute._2".as("count")).
  show
// +-----+-------------------+-----+
// |webID|          timestamp|count|
// +-----+-------------------+-----+
// | John|2018-02-01 03:00:00|   60|
// | John|2018-02-01 03:01:00|   62|
// | John|2018-02-01 03:02:00|   64|
// | John|2018-02-01 03:03:00|   66|
// | John|2018-02-01 03:04:00|   68|
// | John|2018-02-01 03:05:00|   70|
// |   Mo|2017-06-04 01:07:00|   20|
// |   Mo|2017-06-04 01:08:00|   25|
// |   Mo|2017-06-04 01:09:00|   30|
// |   Mo|2017-06-04 01:10:00|   35|
// |   Mo|2017-06-04 01:11:00|   40|
// +-----+-------------------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...