PySpark reduByKey продолжает сбой, когда количество ключей мало по сравнению с количеством значений на ключ - PullRequest
1 голос
/ 31 января 2020

У меня есть следующий код.

x = sc.parallelize([
  (1, [3]),
  (1, [4])
])

y = x.reduceByKey(lambda accum, n: accum.extend(n))
print(y.collect())

Я ожидал, что y будет

[(1, [3, 4])]

, однако y в итоге будет

[(1, None)]

Вопрос 1: Почему я получаю None вместо массива?

Чтобы исправить вышесказанное, я делаю следующее.

import itertools

def merge(accum, n):
  arr = [accum, n]
  return list(itertools.chain.from_iterable(arr))

x = sc.parallelize([
  (1, [3]),
  (1, [4])
])

y = x.reduceByKey(lambda accum, n: merge(accum, n))
print(y.collect())

На этот раз y равно [(1, [3, 4])].

Вопрос 2: Как слияние (или уменьшение) с itertools работает во втором случае?

Когда я применяю подход itertools к очень большому набору данных (миллионы записи), затем я получаю следующую ошибку.

Py4JJavaError: Произошла ошибка при вызове z: org. apache .spark.api. python .PythonRDD.collectAndServe. : org. apache .spark.SparkException: задание прервано из-за сбоя этапа: сбой задачи 13 на этапе 108.0 4 раза, последний сбой: сбой задачи 13.3 на этапе 108.0 (TID 14305, 15.5.15.31, исполнитель 8): org . apache .spark.api. python .PythonException: обратная трассировка (последний вызов был последним): файл "/databricks/spark/python/pyspark/worker.py", строка 480, в файле основного процесса () "/ databricks / spark / python / pyspark / worker.py ", строка 472, в процессе файл serializer.dump_stream (out_iter, outfile) Файл" /databricks/spark/python/pyspark/serializers.py ", строка 509, в dump_stream write_int (len (bytes), stream) Файл "/databricks/spark/python/pyspark/serializers.py", строка 833, в write_int stream.write (struct.pack ("! i", value)) struct.error: 'i' требуется формат -2147483648 <= число <= 2147483647 </p>

Вопрос 3: Поиск по inte rnet не дает никакой подсказки об этой ошибке. Что означает эта ошибка?

Я хочу сгруппировать элементы в ключе PairRDD по ключам, а затем перебрать значения, связанные с одним ключом, для создания логического объекта. Если я использую функцию PySpark groupByKey, я сталкиваюсь с пределами Spark в 2 ГБ.

Вопрос 4: Есть ли идеи о том, как сгруппировать элементы по ключам, а затем преобразовать все эти элементы (по ключам) во что-то другое (не вызывая ограничений памяти)?

В очень большом наборе данных, попытка следующего дает мне RecursionError: maximum recursion depth exceeded while pickling an object.

y = x.reduceByKey(lambda a, b: [a, b])

1 Ответ

0 голосов
/ 31 января 2020

ANS1. Функция extend () расширяет список, но возвращает None. Если вы хотите добавить два массива, вы должны использовать оператор «+».

x = sc.parallelize([
  (1, [3]),
  (1, [4])
])

y = x.reduceByKey(lambda accum, n: accum+n)
print(y.collect())

Подробнее об этом можно прочитать здесь .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...