интересно, почему пустой внутренний итератор вызывает не сериализуемое исключение с mapPartitionsWithIndex - PullRequest
0 голосов
/ 25 июня 2018

Я экспериментировал с картой Spark mapPartitionsWithIndex и столкнулся с проблемами, когда пытается вернуть итератор кортежа, который сам содержит пустой итератор.

Я пробовал несколько разных способов создания внутреннего итератора [через Iterator () и List (...). Iterator], и все дороги позволили мне получить эту ошибку:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 0.0 (TID 2) had a not serializable result: scala.collection.LinearSeqLike$$anon$1
Serialization stack:
        - object not serializable (class: scala.collection.LinearSeqLike$$anon$1, value: empty iterator)
        - field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
        - object (class scala.Tuple2, (1,empty iterator))
        - element of array (index: 0)
        - array (class [Lscala.Tuple2;, size 1)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)

Мой пример кода приведен ниже. Обратите внимание, что при условии, что он работает нормально (пустой итератор возвращается как Значение mapPartitionsWithIndex.) Но при запуске с закомментированной версией При вызовах mapPartitionsWithIndex вы получите ошибку выше.

Если у кого-нибудь есть предложение, как это можно сделать, я был бы очень признателен.

import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ANonWorkingExample extends App {
  val sparkConf = new SparkConf().setAppName("continuous").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)
  val parallel: RDD[Int] = sc.parallelize(1 to 9)
  val parts: Array[Partition] = parallel.partitions

  val partRDD: RDD[(Int, Iterator[Int])] =
    parallel.coalesce(3).
      mapPartitionsWithIndex {
        (partitionIndex: Int, inputiterator: Iterator[Int]) =>
          val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
          // Iterator((partitionIndex, mappedInput)) // FAILS
          Iterator()   // no exception.. but not really what i want.

      }

  val data = partRDD.collect
  println("data:" + data.toList);
}

Ответы [ 2 ]

0 голосов
/ 04 июля 2018

Таким образом, глупая вещь, которую я делал, пыталась вернуть несериализуемую структуру данных: итератор, как ясно указано в трассировке стека, которую я получил.

И решение состоит в том, чтобы не использовать итератор.Скорее используйте коллекцию как Seq или List.Приведенный ниже пример программы иллюстрирует правильный способ сделать то, что я пытался сделать.

import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object AWorkingExample extends App {
  val sparkConf = new SparkConf().setAppName("batman").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)
  val parallel: RDD[Int] = sc.parallelize(1 to 9)
  val parts: Array[Partition] = parallel.partitions

  val partRDD: RDD[(Int, List[Int])] =
    parallel.coalesce(3).
      mapPartitionsWithIndex {
        (partitionIndex: Int, inputiterator: Iterator[Int]) =>
          val mappedInput: Iterator[Int] = inputiterator.map(_ + 1)
          Iterator((partitionIndex, mappedInput.toList)) // Note the .toList() call -- that makes it work
      }

  val data = partRDD.collect
  println("data:" + data.toList);
}

Кстати, то, что я пытался сделать изначально, - это увидеть конкретно, какие куски данных из моего распараллеленного-RDD структура была назначена на какой раздел.Вот результат, который вы получите, запустив программу:

data: List ((0, List (2, 3)), (1, List (4, 5, 6)), (2, List(7, 8, 9, 10)))

Интересно, что распределение данных могло бы быть более оптимально сбалансированным, но не было.Дело не в этом, но я подумал, что это интересно.

0 голосов
/ 01 июля 2018

Я не уверен, чего вы пытаетесь достичь, и я в некотором роде новичок по сравнению с некоторыми экспертами здесь.

Я представляю кое-что, что может дать вам представление о том, как делать то, что я думаю правильно, и сделать несколько комментариев:

  1. Вы, кажется, получаете разделы явно и вызываете mapPartitions - 1-йдля меня.
  2. RDD внутри mapPartitions и различные SPARK SCALA не будут летать;речь идет об итерациях, и я думаю, что вам нужно перейти на уровень SCALA.
  3. Сериализуемая ошибка возникает из-за выполнения List [Int].

Вот пример, показывающий разделение индекса вдольс этими соответствующими значениями индекса.

import org.apache.spark.{Partition, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}
// from your stuff, left in

val parallel: RDD[Int] = sc.parallelize(1 to 9, 4)
val mapped =   parallel.mapPartitionsWithIndex{
                       (index, iterator) => {
                          println("Called in Partition -> " + index)
                          val myList = iterator.toList                          
                          myList.map(x => (index, x)).groupBy( _._1 ).mapValues( _.map( _._2 ) ).toList.iterator
                       }
                 }  
mapped.collect()

Это возвращает следующее, что немного напоминает то, что, как мне кажется, вы хотели:

res38: Array[(Int, List[Int])] = Array((0,List(1, 2)), (1,List(3, 4)), (2,List(5, 6)), (3,List(7, 8, 9)))

Заключительное примечание: документацию и тому подобное не так легко выполнить, выне получить все это из примера подсчета слов!

Итак, надеюсь, это поможет.

Я думаю, что это может привести вас к правильному пути туда, куда вы хотите пойти, я не совсем мог это увидеть, но, может быть, теперь вы можете видеть лес за деревьями.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...