Что касается манипулирования кадрами данных - PullRequest
0 голосов
/ 07 мая 2018

Я пытаюсь добиться следующего:

  1. Мне даны данные временного ряда с 3 столбцами - «Timestamp, Lag1_Timestamp, MyData», где Lag1_Timestamp - это временная метка с задержкой на 1.

  2. Я должен сначала создать еще один столбец time_diff = Timestamp - Lag1_Timestamp , с ограничением, что всякий раз, когда я сталкиваюсь с MyData = 0, тогда time_diff = 0.

    val df3 = df2.withColumn ("time_diff", когда (df2 ("Timestamp") === 0, 0) .otherwise (когда (df2 ("MyData") === 0, 0) .otherwise ( (df2 ( "Отметка") - df2 ( "Lag1_Timestamp")))))

  3. Как только time_diff рассчитывается, мне нужно рассчитать совокупную сумму следующим образом:

а. Начните кумулятивную сумму с '0', поэтому для отметки времени 0 cum_sum = 0

б. Затем продолжайте находить совокупную сумму для каждой записи. (Предположим, что кадры данных отсортированы по отметке времени).

с. Но всякий раз, когда вы сталкиваетесь со значением time_diff = 0 , вы перезапускаете накопительную сумму до 0 и перезапускаете накопленную сумму с этой точки.

val list = df3.collect()
val cumSum = Array.ofDim[Double](list.length);
for((cur,i) <- list.view.zipWithIndex){
  if(i!=0){
    var prev = list(i-1);
    if(prev(1)!=0 && cur(1)!=0){
       cumSum(i) = cumSum(i-1) + cur(3).asInstanceOf[Double] + prev(3).asInstanceOf[Double]
    }
  }
}
val summing = sc.parallelize(cumSum).toDF("Uptime")
def addIndex(df: DataFrame) = sqlContext.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)
// Add indices
val aWithIndex = addIndex(inDF)
val bWithIndex = addIndex(summing)

// Join and clean
val ab1 = aWithIndex
  .join(bWithIndex, Seq("_index")).orderBy(asc("Timestamp"))
  .drop("_index")

Хотя код работает, он очень медленный. У меня вопрос, есть ли лучший способ достичь той же цели, как указано выше.

Спасибо и всего наилучшего

...