Spark RDD Windowing с использованием pyspark - PullRequest
0 голосов
/ 22 ноября 2018

Существует Spark RDD, называемый rdd1.У него есть пара (key, value), и у меня есть список, элементами которого являются tuple(key1,key2).

Я хочу получить rdd2 со строками `((key1, key2), (значение key1 вrdd1, значение ключа 2 в rdd1)).

Может ли кто-нибудь мне помочь?

rdd1:

key1, value1,
key2, value2,
key3, value3

массив: [(key1,key2),(key2,key3)]

Результат:

(key1,key2),value1,value2
(key2,key3),value2,value3

Я пробовал

spark.parallize(array).map(lambda x:)

1 Ответ

0 голосов
/ 23 ноября 2018

скольжение с SCALA против скольжения mllib - две реализации, немного сложновато, но вот оно:

import org.apache.spark.mllib.rdd.RDDFunctions._
val rdd1 = sc.parallelize(Seq(
              ( "key1", "value1"),
              ( "key2", "value2"),
              ( "key3", "value3"),
              ( "key4", "value4"),
              ( "key5", "value5")
          ))
val rdd2 = rdd1.sliding(2)
val rdd3 = rdd2.map(x => (x(0), x(1))) 
val rdd4 = rdd3.map(x => ((x._1._1, x._2._1),x._1._2, x._2._2))  
rdd4.collect

также, следующее, и это, конечно, лучше ...:

val rdd5 = rdd2.map{case Array(x,y) => ((x._1, y._1), x._2, y._2)}
rdd5.collect

возвращает в обоих случаях:

res70: Array[((String, String), String, String)] = Array(((key1,key2),value1,value2), ((key2,key3),value2,value3), ((key3,key4),value3,value4), ((key4,key5),value4,value5))

, который, как я считаю, соответствует вашим потребностям, но не в pyspark.

В Stack Overflow вы можете найти утверждения, что pyspark не имеет эквивалентадля RDD, если вы не "катите свое".Вы можете посмотреть на это Как преобразовать данные с помощью скользящего окна в данные временных рядов в Pyspark .Тем не менее, я бы посоветовал Data Frames с использованием pyspark.sql.functions.lead () и pyspark.sql.functions.lag ().Несколько проще.

...