Как извлечь строковые значения из RDD Array [Array [String]] в spark-shell? - PullRequest
0 голосов
/ 12 февраля 2020

У меня есть массив следующим образом:

 Array[Array[String]] = Array(Array(1,1,1,300,item1), Array(2,1,2,300,item2), Array(3,1,2,300,item3), Array(4,2,3,100,item4), Array(5,1,3,300,item5))

Я хочу извлечь ((1,1)(1,2)(1,2)(2,3)(1,3)) т.е. каждый массив (массив) 2-й и 3-й элементы. Когда я выполняю это преобразование для RDD в spark-shell:

val arr = flat.map(array => (array(0), array(1)))

arr.collect

, тогда я получаю ошибку следующим образом:

[Stage 6:>                                                          (0 + 0) / 2]20/02/12 02:03:01 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 13)
java.lang.ArrayIndexOutOfBoundsException: 1
    at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31)
    at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:31)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912)
    at org.apache.spark.SparkCont

EDIT 1: полный код после того, как я использую первый ответ : Тем не менее я не могу извлечь две строки из массива (Array ())

scala> flat.collect
res3: Array[Array[String]] = Array(Array(1,1,1,300,item1), Array(2,1,2,300,item2), Array(3,1,2,300,item3), Array(4,2,3,100,item4), Array(5,1,3,300,item5))
scala> val parl = sc.parallelize(flat)
<console>:31: error: type mismatch;
 found   : org.apache.spark.rdd.RDD[Array[String]]
 required: Seq[?]
Error occurred in an application involving default arguments.
         val parl = sc.parallelize(flat)
scala> val parl = sc.parallelize(flat.collect)
parl: org.apache.spark.rdd.RDD[Array[String]] = ParallelCollectionRDD[6] at parallelize at <console>:31
scala> parl.collect
res4: Array[Array[String]] = Array(Array(1,1,1,300,item1), Array(2,1,2,300,item2), Array(3,1,2,300,item3), Array(4,2,3,100,item4), Array(5,1,3,300,item5))
scala> val gvk = parl.map(array=>(array(0),array(1)))
gvk: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[7] at map at <console>:33
scala> gvk.collect
20/02/12 03:27:29 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 13)
java.lang.ArrayIndexOutOfBoundsException: 1
    at $line29.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33)
    at $line29.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:33)

1 Ответ

0 голосов
/ 12 февраля 2020

Попробуйте это:

scala> val rdd=sc.parallelize(Array(Array("1","1","1","300","item1"), Array("2","1","2","300","item2"), Array("3","1","2","300","item3"), Array("4","2","3","300","item4"), Array("5","1","3","300","item5")))
rdd: org.apache.spark.rdd.RDD[Array[String]] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> rdd.map(array=>(array(0),array(1))).collect
res2: Array[(String, String)] = Array((1,1), (2,1), (3,1), (4,2), (5,1))
...