как получить доступ к СДР в другом СДР? - PullRequest
0 голосов
/ 19 января 2019

У меня есть 2 RDD (в pyspark) в форме rdd1=(id1, value1) и rdd2=(id2, value2), где id уникальны (т. Е. Все id1 отличаются от id2).

У меня есть третий RDD в форме resultRDD=((id1, id2), value3).я хочу отфильтровать этот последний, чтобы сохранить только элемент с value3 > (value1+value2).

, если я получаю доступ к rdd1 и rdd2, я получаю следующее исключение:

pickle.PicklingError: Could not serialize object: Exception: It appears that you
 are attempting to broadcast an RDD or reference an RDD from an action or transf
ormation. RDD transformations and actions can only be invoked by the driver, not
 inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.co
unt() * x) is invalid because the values transformation and count action cannot
be performed inside of the rdd1.map transformation. For more information, see SP
ARK-5063.

Так что этолучшая стратегия для доступа к rdd1 и rdd2 для того, чтобы отфильтровать результатRDD?

solution1:

, если я brodcast rdd1 и rdd2, это работает, но я думаю, что это не оптимизированное решение, так как rdd1 и rdd2 огромны.

solution2:

Вместо того, чтобы транслировать rdd1 и rdd2, мы можем собрать rdd1 и rdd2 и, таким образом, мы можем выполнить фильтрацию.Так, пожалуйста, каково эффективное решение в моем случае?

моя функция выглядит так:

def filterResultRDD(resultRDD, rdd1, rdd2):


    source = rdd1.collect()
    target = rdd2.collect()
    f = resultRDD.filter(lambda t: t[1] >= getElement(source, t[0][0])+ getElement(target, t[0][1])).cache()
    return f

def getElement(mydata, key):
    return [item[1] for item in mydata if item[0] == key][0]    

1 Ответ

0 голосов
/ 19 января 2019

Сначала о решениях, которые вы предложили:
решение2 :
Никогда не собирай rdd.
Если вы соберете rdd, это означает, что ваше решение не будет масштабируемым, или это означает, что вам не нужен rdd.
решение1 :
Аналогично ссылке на solution2, но с некоторыми исключениями, ваш случай не является одним из таких исключений.

Как уже упоминалось, "искровой" способ сделать это - использовать " join ".
Конечно, нет необходимости преобразовывать данные в фрейм искры.

Вот решение:

rdd1 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)])
rdd2 = sc.parallelize([('aa', 1), ('bb', 2), ('cc', 3), ('dd', 4), ('ee', 5)])
rdd3 = sc.parallelize([(('a', 'aa'), 1), (('b', 'dd'), 8), (('e', 'aa'), 34), (('c', 'ab'), 23)])

print rdd3.map(lambda x: (x[0][0], (x[0][1], x[1])))\
.join(rdd1)\
.map(lambda x: (x[1][0][0], (x[0], x[1][0][1], x[1][1]))).join(rdd2)\
.filter(lambda x: x[1][0][1] > x[1][0][2] + x[1][1])\
.map(lambda x: ((x[1][0][0], x[0]), x[1][0][1]))\
.collect()

--> [(('b', 'dd'), 8), (('e', 'aa'), 34)]
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...