pyspark Udf не работает должным образом, когда применяется преобразование карты с трансляцией? - PullRequest
0 голосов
/ 07 декабря 2018

У меня есть два списка, как показано ниже

l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]

Я хочу удалить из списка списков l все элементы, которые не содержат все элементы ни в одном из tuple sв списке x.Другими словами, должен быть хотя бы один tuple в x, для которого все элементы, которые соответствуют кортежу, присутствуют в элементах l.

На основе моего последнего вопроса, мне было дано следующее решение в python:

print([l_ for l_ in l if any(all(e in l_ for e in x_) for x_ in x)])

, которое дает желаемый результат:

[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B']]

Теперь я пытаюсь повторить ту же операцию с pysparkrdd, но я не получаю ожидаемого результата.

Вот что я пытался:

rddsort=sc.parallelize(l)
broadcastVar = sc.broadcast(x)

def flist(unique_product_List,x):
    filter_list = [
        l_ for l_ in unique_product_List 
        if any(all(e in l_ for e in x_) for x_ in x)
    ]

    return filter_list

rddsort=rddsort.map(lambda flist(x[0],broadcastVar.value)) 
print(rddsort.collect())

В результате я получаю список пустых списков:

[[], [], [], [], [], []]

Но мой ожидаемый результат должен быть таким жекак указано выше.

1 Ответ

0 голосов
/ 07 декабря 2018

Вам нужен фильтр на rdd (не карта).Фильтр будет проверять наличие условий в каждой строке и удалять те, которые не совпадают.Здесь условие состоит в том, что значение строки (список _l = l [0]) должно иметь все элементы в одном из списков в x.

l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]
rddsort=sc.parallelize(l)

rddsort=rddsort.filter(lambda l_: any(all(e in l_ for e in x_) for x_ in x)) 
print(rddsort.collect())

Выходные данные

[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B']]

Обновление:С широковещательной переменной в функции:

l=[['A', 'B', 'C'], ['A', 'C'], ['A', 'B', 'C'], ['A', 'B'],['B','C'],['B']]
x=[('A', 'B'), ('A', 'C')]
rddsort=sc.parallelize(l)
broadcastVar = sc.broadcast(x)

def flist(row):
    filter_flag = any(all(e in l_ for e in x_) for x_ in broadcastVar.value)
    return filter_flag

rddsort=rddsort.filter(flist) 
print(rddsort.collect())
...