объединитьByKey не удается - PullRequest
1 голос
/ 07 октября 2019

Я копирую и вставляю точный код из учебника O'Reilly Learning Spark и получаю сообщение об ошибке: org.apache.spark.SparkException: задание прервано из-за сбоя этапа

Я пытаюсьЯ понимаю, что делает этот код, но у меня возникают проблемы с его пониманием, потому что он не запускается:

nums = sc.parallelize([1, 2, 3, 4])
sumCount = nums.combineByKey((lambda x: (x,1)),
 (lambda x, y: (x[0] + y, x[1] + 1)),
 (lambda x, y: (x[0] + y[0], x[1] + y[1])))

sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()

Ниже приведена полная ошибка, какие-либо идеи?

Job aborted due to stage failure: Task 3 in stage 26.0 failed 1 times, most recent failure: Lost task 3.0 in stage 26.0 (TID 73, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/worker.py", line 480, in main
    process()
  File "/databricks/spark/python/pyspark/worker.py", line 470, in process
    out_iter = func(split_index, iterator)
  File "/databricks/spark/python/pyspark/rdd.py", line 2543, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/databricks/spark/python/pyspark/rdd.py", line 353, in func
    return f(iterator)
  File "/databricks/spark/python/pyspark/rdd.py", line 1905, in combineLocally
    merger.mergeValues(iterator)
  File "/databricks/spark/python/pyspark/shuffle.py", line 238, in mergeValues
    for k, v in iterator:
TypeError: 'int' object is not iterable

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:514)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:650)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:633)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:468)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:

1 Ответ

1 голос
/ 08 октября 2019

Хорошо, хорошо.

Если предположить, что приведенные выше числа не так хороши, поскольку это не кортеж (K, V), тогда предположим, что код выглядит следующим образом:

 data = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )

 sumCount = data.combineByKey(lambda value: (value, 1),
                              lambda x, value: (x[0] + value, x[1] + 1),
                              lambda x, y: (x[0] + y[0], x[1] + y[1]))

 averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))

 print averageByKey.collectAsMap()

В Spark с python2 (pyspark) вышеуказанный код работает нормально.

В Spark с python3 (pyspark) приведенный выше код выдает ошибку:

 averageByKey = sumCount.map(lambda (label, (value_sum, count)): (label, value_sum / count))

https://www.python.org/dev/peps/pep-3113/ объясняет, почему эта функция, «распаковка параметров кортежа», была удалена в Python 3. Кажется, что-то вроде подвоха для меня.

Самый простой способ решить эту проблему - передать вышеизложенноевведите код онлайн в https://www.pythonconverter.com/ и запустите конвертер кодов. Это:

data         = sc.parallelize( [(0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)] )

sumCount     = data.combineByKey(lambda value: (value, 1),
                                 lambda x, value: (x[0] + value, x[1] + 1),
                                 lambda x, y: (x[0] + y[0], x[1] + y[1]))

averageByKey = sumCount.map(lambda label_value_sum_count: (label_value_sum_count[0], label_value_sum_count[1][0] / label_value_sum_count[1][1]))

print(averageByKey.collectAsMap()) 

возвращает корректно:

{0: 3.0, 1: 10.0}

averageByKey теперь имеет другое объявление. Вы должны изучить и прочитать эту ссылку и познакомиться с ней, используя Конвертер Python 2 в 3. Экономит некоторое время, и вы можете облегчить свой путь в него. У уважаемого сотрудника SO также были некоторые проблемы с этим, так что у вас это есть, не так просто.

...