У меня есть следующий код.
x = sc.parallelize([
(1, [3]),
(1, [4])
])
y = x.reduceByKey(lambda accum, n: accum.extend(n))
print(y.collect())
Я ожидал, что y
будет
[(1, [3, 4])]
, однако y
в итоге будет
[(1, None)]
Вопрос 1: Почему я получаю None
вместо массива?
Чтобы исправить вышесказанное, я делаю следующее.
import itertools
def merge(accum, n):
arr = [accum, n]
return list(itertools.chain.from_iterable(arr))
x = sc.parallelize([
(1, [3]),
(1, [4])
])
y = x.reduceByKey(lambda accum, n: merge(accum, n))
print(y.collect())
На этот раз y
равно [(1, [3, 4])]
.
Вопрос 2: Как слияние (или уменьшение) с itertools
работает во втором случае?
Когда я применяю подход itertools
к очень большому набору данных (миллионы записи), затем я получаю следующую ошибку.
Py4JJavaError: Произошла ошибка при вызове z: org. apache .spark.api. python .PythonRDD.collectAndServe. : org. apache .spark.SparkException: задание прервано из-за сбоя этапа: сбой задачи 13 на этапе 108.0 4 раза, последний сбой: сбой задачи 13.3 на этапе 108.0 (TID 14305, 15.5.15.31, исполнитель 8): org . apache .spark.api. python .PythonException: обратная трассировка (последний вызов был последним): файл "/databricks/spark/python/pyspark/worker.py", строка 480, в файле основного процесса () "/ databricks / spark / python / pyspark / worker.py ", строка 472, в процессе файл serializer.dump_stream (out_iter, outfile) Файл" /databricks/spark/python/pyspark/serializers.py ", строка 509, в dump_stream write_int (len (bytes), stream) Файл "/databricks/spark/python/pyspark/serializers.py", строка 833, в write_int stream.write (struct.pack ("! i", value)) struct.error: 'i' требуется формат -2147483648 <= число <= 2147483647 </p>
Вопрос 3: Поиск по inte rnet не дает никакой подсказки об этой ошибке. Что означает эта ошибка?
Я хочу сгруппировать элементы в ключе PairRDD
по ключам, а затем перебрать значения, связанные с одним ключом, для создания логического объекта. Если я использую функцию PySpark groupByKey
, я сталкиваюсь с пределами Spark в 2 ГБ.
Вопрос 4: Есть ли идеи о том, как сгруппировать элементы по ключам, а затем преобразовать все эти элементы (по ключам) во что-то другое (не вызывая ограничений памяти)?
В очень большом наборе данных, попытка следующего дает мне RecursionError: maximum recursion depth exceeded while pickling an object
.
y = x.reduceByKey(lambda a, b: [a, b])