KeyError: '' Pyspark с пустой строкой ошибки (Spark RDD) - PullRequest
0 голосов
/ 08 октября 2019

Я выполняю простое упражнение, чтобы рекомендовать новых друзей на основе граничного списка общих друзей, вычисляя 20 лучших друзей каждого конкретного пользователя с учетом некоторых условий фильтрации. Я использую Spark RDD для выполнения этой задачи.

У меня есть список краев в all_friends, в котором края списка друзей хранятся в виде пары ключ-значение. График является ненаправленным, поэтому для каждого ('0', '1'), ('1', '0') также появляется

    all_friends.take(4)
[('0', '1'), ('0', '2'), ('1', '0'), ('1', '3')]

Так что часть моего кода содержит следующее:

    from collections import Counter
results = all_friends\
    .join(all_friends)\
    .filter(filter_conditions)\
    .map(lambda af1f2: (af1f2[1][0], af1f2[1][1]))\ #at this point each entry has form [(k,(v1,v2)], hence the lambda expression
    .groupByKey()\
    .mapValues(lambda v: Counter(v).most_common(20))

Однако после картыЯ получаю KeyError, отмеченную ниже. Это также происходит, если я поставлю .keys().collect() сразу после карты. Это странно, так как я не уверен, почему spark ищет ключ '' (пустую строку), когда он явно не существует в моем оригинальном rdd. Я не уверен, имеет ли это отношение к полному внешнему соединению. Может кто-нибудь посоветовать?

 An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 78.0 failed 3 times, most recent failure: Lost task 1.2 in stage 78.0 (TID 291, 100.103.89.116, executor 5): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in main
    process()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 367, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 390, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-155-140ba198945e>", line 2, in <lambda>
KeyError: ''

1 Ответ

1 голос
/ 08 октября 2019

filter_conditions выглядит некорректно. Вот рабочий код с кодом псевдофильтра

from pyspark import SparkConf
from pyspark.sql import SparkSession
from collections import Counter

conf = SparkConf().setAppName('Python Spark').set("spark.executor.memory", "1g")
spark_session = SparkSession.builder.config(conf=conf).getOrCreate()

all_friends = spark_session.sparkContext.parallelize([('0', '1'), ('0', '2'), ('1', '0'), ('1', '3'), ('1', '3')])


# [('0', '1'), ('0', '2'), ('1', '0'), ('1', '3')]

# print(all_friends.take(4).collect())

def filter_conditions(c):
    if c[0] == '1':
        return c


results = all_friends.join(all_friends).filter(filter_conditions).map(
    lambda af1f2: (af1f2[1][0], af1f2[1][1])).groupByKey().mapValues(lambda v: Counter(v).most_common(20))

print(results.collect())

выход

[('3', [('3', 4), ('0', 2)]), ('0', [('3', 2), ('0', 1)])]
...