Как заставить генератор работать в spark mapPartitions ()? - PullRequest
0 голосов
/ 16 февраля 2019

Я пытаюсь использовать mapPartiton в spark для обработки большого текстового корпуса: допустим, у нас есть некоторые полуобработанные данные, которые выглядят так:

    text_1 = [['A', 'B', 'C', 'D', 'E'],
    ['F', 'E', 'G', 'A', 'B'],
    ['D', 'E', 'H', 'A', 'B'],
    ['A', 'B', 'C', 'F', 'E'],
    ['A', 'B', 'C', 'J', 'E'],
    ['E', 'H', 'A', 'B', 'C'],
    ['E', 'G', 'A', 'B', 'C'],
    ['C', 'F', 'E', 'G', 'A'],
    ['C', 'D', 'E', 'H', 'A'],
    ['C', 'J', 'E', 'H', 'A'],
    ['H', 'A', 'B', 'C', 'F'],
    ['H', 'A', 'B', 'C', 'J'],
    ['B', 'C', 'F', 'E', 'G'],
    ['B', 'C', 'D', 'E', 'H'],
    ['B', 'C', 'F', 'E', 'K'],
    ['B', 'C', 'J', 'E', 'H'],
    ['G', 'A', 'B', 'C', 'F'],
    ['J', 'E', 'H', 'A', 'B']]

Каждая буква - это слово.У меня также есть словарь:

    V = ['D','F','G','C','J','K']
    text_1RDD = sc.parallelize(text_1)

, и я хочу запустить следующее в спарк:

    filtered_lists = text_1RDD.mapPartitions(partitions)

    filtered_lists.collect()

У меня есть эта функция:

    def partitions(list_of_lists,vc):

            for w in vc:

                iterator = []
                for sub_list in list_of_lists:

                    if w in sub_list:
                        iterator.append(sub_list)

        yield (w,len(iterator))

Если я запускаюэто так:

    c = partitions(text_1,V)
    for item in c:
        print(item)

возвращает верный счетчик

    ('D', 4)
    ('F', 7)
    ('G', 5)
    ('C', 15)
    ('J', 5)
    ('K', 1)

Однако я понятия не имею, как запустить его в спарк:

    filtered_lists = text_1RDD.mapPartitions(partitions)

    filtered_lists.collect()

Он имееттолько один аргумент и генерирует много ошибок при работе в Spark ...

Но даже если я кодирую словарь внутри функции разделов:

    def partitionsV(list_of_lists):
            vc = ['D','F','G','C','J','K']
            for w in vc:

                iterator = []
                for sub_list in list_of_lists:

                    if w in sub_list:
                        iterator.append(sub_list)

        yield (w,len(iterator))

.. Я получил это:

    filtered_lists = text_1RDD.mapPartitions(partitionsV)

    filtered_lists.collect()

вывод:

     [('D', 2),
     ('F', 0),
     ('G', 0),
     ('C', 0),
     ('J', 0),
     ('K', 0),
     ('D', 0),
     ('F', 0),
     ('G', 0),
     ('C', 0),
     ('J', 0),
     ('K', 0),
     ('D', 1),
     ('F', 0),
     ('G', 0),
     ('C', 0),
     ('J', 0),
     ('K', 0),
     ('D', 1),
     ('F', 0),
     ('G', 0),
     ('C', 0),
     ('J', 0),
     ('K', 0)]

Очевидно, генератор работал не так, как ожидалось.Я полностью застрял.Я очень новичок, чтобы зажечь.Буду очень признателен, если кто-нибудь сможет мне объяснить, что здесь происходит ...

1 Ответ

0 голосов
/ 16 февраля 2019

Это еще одна проблема с подсчетом слов, и mapPartitions не является инструментом для задания:

from operator import add

v = set(['D','F','G','C','J','K'])

result = text_1RDD.flatMap(v.intersection).map(lambda x: (x, 1)).reduceByKey(add)

И в результате получается

for x in result.sortByKey().collect(): 
    print(x) 
('C', 15)
('D', 4)
('F', 7)
('G', 5)
('J', 5)
('K', 1)
...