Как удалить элементы, как удалить элементы из одного rdd на основе другого rdd и создать новый rdd в pyspark? - PullRequest
0 голосов
/ 04 декабря 2018

Я создал 2 Rdd, как показано ниже

rdd=sc.parallelize([(0,'A'),(0,'B'),(1,'D'),(1,'B'),(1,'C'),(2,"A"),(2, "B"),(2, "E")])
rdd1=rdd.groupByKey().map(lambda x :list(x[1]))
rdd1.collect()
[['A', 'B'], ['D', 'B', 'C'], ['A', 'B', 'E']]
rdd2=sc.parallelize(['D','E'])
rdd2.collect()
Out[204]: ['D', 'E']

Теперь я хочу удалить элементы из rdd1, если он представлен в rdd2, т.е.

У меня есть 2 элемента внутри rdd2 ('D','E')

Теперь я хочу удалить эти элементы из rdd1.

мой ожидаемый rdde3:

[['A', 'B'], ['B', 'C'], ['A', 'B']]

1 Ответ

0 голосов
/ 04 декабря 2018

Сначала соберите все элементы второго rdd в список.Примените условие фильтрации, затем выполните групповую обработку.

from pyspark import SparkContext

sc = SparkContext('local')
rdd=sc.parallelize([(0,'A'),(0,'B'),(1,'D'),(1,'B'),(1,'C'),(2,"A"),(2, "B"),(2, "E")])
print(rdd.collect())
rdd1=rdd.groupByKey().map(lambda x :list(x[1]))
list1 = rdd1.collect()
print(list1)
rdd2=sc.parallelize(['D','E'])
list2 =rdd2.collect()
print(list2)

rdd2list = rdd2.collect()
filteredrdd = rdd.filter(lambda x: x[1] not in rdd2list)
finalrdd=filteredrdd.groupByKey().map(lambda x :list(x[1]))
print(finalrdd.collect())

Вот итоговый вывод:

[['A', 'B'], ['B', 'C'], ['A', 'B']]

Обновите согласно вашему комментарию:

def filter_list(x):
    return [ele for ele in x if ele not in rdd2list]


final2rdd = rdd1.map(lambda x: filter_list(x))
print(final2rdd.collect())

Вотвывод final2rdd, такой же, как и раньше:

[['A', 'B'], ['B', 'C'], ['A', 'B']]
...