Создайте Tuple из Array (Array [String) разных размеров, используя Scala - PullRequest
0 голосов
/ 20 ноября 2018

Я новичок в scala и пытаюсь создать из пары Tuple RDD типа Array (Array [String]), который выглядит следующим образом:

(122abc,223cde,334vbn,445das),(221bca,321dsa),(231dsa,653asd,698poq,897qwa)

Я пытаюсь создать пары Tupleиз этих массивов, так что первый элемент каждого массива является ключом, а любая другая часть массива является значением.Например, результат будет выглядеть следующим образом:

122abc    223cde
122abc    334vbn
122abc    445das
221bca    321dsa
231dsa    653asd
231dsa    698poq
231dsa    897qwa

Я не могу понять, как отделить первый элемент от каждого массива, а затем сопоставить его с каждым другим элементом.

Ответы [ 4 ]

0 голосов
/ 20 ноября 2018

Использование df и explode.

  val df =   Seq(
      Array("122abc","223cde","334vbn","445das"),
      Array("221bca","321dsa"),
      Array("231dsa","653asd","698poq","897qwa")
    ).toDF("arr")
    val df2 = df.withColumn("key", 'arr(0)).withColumn("values",explode('arr)).filter('key =!= 'values).drop('arr).withColumn("tuple",struct('key,'values))
    df2.show(false)
    df2.rdd.map( x => Row( (x(0),x(1)) )).collect.foreach(println)

Вывод:

+------+------+---------------+
|key   |values|tuple          |
+------+------+---------------+
|122abc|223cde|[122abc,223cde]|
|122abc|334vbn|[122abc,334vbn]|
|122abc|445das|[122abc,445das]|
|221bca|321dsa|[221bca,321dsa]|
|231dsa|653asd|[231dsa,653asd]|
|231dsa|698poq|[231dsa,698poq]|
|231dsa|897qwa|[231dsa,897qwa]|
+------+------+---------------+


[(122abc,223cde)]
[(122abc,334vbn)]
[(122abc,445das)]
[(221bca,321dsa)]
[(231dsa,653asd)]
[(231dsa,698poq)]
[(231dsa,897qwa)]

Update1:

Использование парного rdd

val df =   Seq(
  Array("122abc","223cde","334vbn","445das"),
  Array("221bca","321dsa"),
  Array("231dsa","653asd","698poq","897qwa")
).toDF("arr")
import scala.collection.mutable._
val rdd1 = df.rdd.map( x => { val y = x.getAs[mutable.WrappedArray[String]]("arr")(0); (y,x)} )
val pair = new PairRDDFunctions(rdd1)
pair.flatMapValues( x => x.getAs[mutable.WrappedArray[String]]("arr") )
    .filter( x=> x._1 != x._2)
    .collect.foreach(println)

Результаты:

(122abc,223cde)
(122abc,334vbn)
(122abc,445das)
(221bca,321dsa)
(231dsa,653asd)
(231dsa,698poq)
(231dsa,897qwa)
0 голосов
/ 20 ноября 2018

Преобразуйте ваш элемент ввода в seq и все, а затем попробуйте написать оболочку, которая даст вам List(List(item1,item2), List(item1,item2),...)

Попробуйте код ниже

val seqs = Seq("122abc","223cde","334vbn","445das")++
Seq("221bca","321dsa")++
Seq("231dsa","653asd","698poq","897qwa")

Напишите оболочку для преобразования seq впара из двух

def toPairs[A](xs: Seq[A]): Seq[(A,A)] = xs.zip(xs.tail)

Теперь отправьте ваш seq в качестве параметров, и он выдаст пару из двух

toPairs(seqs).mkString(" ")

После того, как он будет преобразован в строку, вы получите вывод, подобный

res8: String = (122abc,223cde) (223cde,334vbn) (334vbn,445das) (445das,221bca) (221bca,321dsa) (321dsa,231dsa) (231dsa,653asd) (653asd,698poq) (698poq,897qwa)

Теперь вы можете конвертировать вашу строку, однако, вы хотите.

0 голосов
/ 20 ноября 2018

Если я правильно читаю, суть вашего вопроса связана с отделением головки (первого элемента) внутренних массивов от хвоста (оставшихся элементов), которую вы можете использовать head и * 1002. * методы. СДР во многом похожи на списки Scala, поэтому вы можете делать все это с помощью кода, похожего на чистый Scala.

С учетом следующего ввода СДР:

val input: RDD[Array[Array[String]]] = sc.parallelize(
  Seq(
    Array(
      Array("122abc","223cde","334vbn","445das"),
      Array("221bca","321dsa"),
      Array("231dsa","653asd","698poq","897qwa")
    )
  )
)

Следующее должно делать то, что вы хотите:

val output: RDD[(String,String)] =
  input.flatMap { arrArrStr: Array[Array[String]] =>
    arrArrStr.flatMap { arrStrs: Array[String] =>
      arrStrs.tail.map { value => arrStrs.head -> value }
    }
  }

И на самом деле, из-за того, как составлен flatMap / map, вы можете переписать его как-для-понимания .:

val output: RDD[(String,String)] =
  for {
    arrArrStr: Array[Array[String]] <- input
    arrStr: Array[String] <- arrArrStr
    str: String <- arrStr.tail
  } yield (arrStr.head -> str)

То, с чем вы идете, в конечном счете, зависит от личных предпочтений (хотя в этом случае я предпочитаю последнее, поскольку вам не нужно слишком много отступать от кода).

Для проверки:

output.collect().foreach(println)

Следует распечатать:

(122abc,223cde)
(122abc,334vbn)
(122abc,445das)
(221bca,321dsa)
(231dsa,653asd)
(231dsa,698poq)
(231dsa,897qwa)
0 голосов
/ 20 ноября 2018

Это классическая операция сгиба;но сворачивание в Spark вызывает aggregate:

// Start with an empty array
data.aggregate(Array.empty[(String, String)]) { 
  // `arr.drop(1).map(e => (arr.head, e))` will create tuples of 
  // all elements in each row and the first element.
  // Append this to the aggregate array.
  case (acc, arr) => acc ++ arr.drop(1).map(e => (arr.head, e))
}

Решением является среда без Spark:

scala> val data = Array(Array("122abc","223cde","334vbn","445das"),Array("221bca","321dsa"),Array("231dsa","653asd","698poq","897qwa"))
scala> data.foldLeft(Array.empty[(String, String)]) { case (acc, arr) =>
     |     acc ++ arr.drop(1).map(e => (arr.head, e))
     | }
res0: Array[(String, String)] = Array((122abc,223cde), (122abc,334vbn), (122abc,445das), (221bca,321dsa), (231dsa,653asd), (231dsa,698poq), (231dsa,897qwa))
...