Не удается выполнить многопроцессорную обработку Python - PullRequest
3 голосов
/ 28 февраля 2012

Мне нужно запустить функцию для каждого из элементов моей базы данных.

Когда я пытаюсь выполнить следующее:

from multiprocessing import Pool
from pymongo import Connection

def foo():
...


connection1 = Connection('127.0.0.1', 27017)
db1 = connection1.data

my_pool = Pool(6)
my_pool.map(foo, db1.index.find())

Я получаю следующую ошибку:

Задание 1, «python myscript.py» прекращается по сигналуSIGKILL (Принудительный выход)

Я думаю, это вызвано тем, что db1.index.find() съел весь доступный баран, пытаясь вернуть миллионы элементов базы данных ...

Как я долженизменить мой код, чтобы он работал?

Некоторые журналы находятся здесь:

dmesg | tail -500 | grep memory
[177886.768927] Out of memory: Kill process 3063 (python) score 683 or sacrifice child
[177891.001379]  [<ffffffff8110e51a>] out_of_memory+0xfa/0x250
[177891.021362] Out of memory: Kill process 3063 (python) score 684 or sacrifice child
[177891.025399]  [<ffffffff8110e51a>] out_of_memory+0xfa/0x250

Фактическая функция ниже:

def create_barrel(item):
    connection = Connection('127.0.0.1', 27017)
    db = connection.data
    print db.index.count()
    barrel = []
    fls = []
    if 'name' in item.keys():
        barrel.append(WhitespaceTokenizer().tokenize(item['name']))
        name = item['name']
    elif 'name.utf-8' in item.keys():
        barrel.append(WhitespaceTokenizer().tokenize(item['name.utf-8']))
        name = item['name.utf-8']
    else:
        print item.keys()
    if 'files' in item.keys():
        for file in item['files']:
            if 'path' in file.keys():
                barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path'])))
                fls.append(("\\".join(file['path']),file['length']))
            elif 'path.utf-8'  in file.keys():
                barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path.utf-8'])))
                fls.append(("\\".join(file['path.utf-8']),file['length']))
            else:
                print file
                barrel.append(WhitespaceTokenizer().tokenize(file))
    if len(fls) < 1:
        fls.append((name,item['length']))
    barrel = sum(barrel,[])
    for s in barrel:
        vs = re.findall("\d[\d|\.]*\d", s) #versions i.e. numbes such as 4.2.7500 
    b0 = []
    for s in barrel:
        b0.append(re.split("[" + string.punctuation + "]", s))
    b1 = filter(lambda x: x not in string.punctuation, sum(b0,[]))
    flag = True
    while flag:
        bb = []
        flag = False
        for bt in b1:
            if bt[0] in string.punctuation:
                bb.append(bt[1:])
                flag = True
            elif bt[-1] in string.punctuation:
                bb.append(bt[:-1])
                flag = True
            else:
                bb.append(bt)
        b1 = bb
    b2 = b1 + barrel + vs
    b3 = list(set(b2))
    b4 = map(lambda x: x.lower(), b3)
    b_final = {}
    b_final['_id'] = item['_id']
    b_final['tags'] = b4
    b_final['name'] = name
    b_final['files'] = fls
    print db.barrels.insert(b_final)

Я заметил интересную вещь.Затем я нажимаю Ctrl + C, чтобы остановить процесс, и получаю следующее:

python index2barrel.py 
Traceback (most recent call last):
  File "index2barrel.py", line 83, in <module>
    my_pool.map(create_barrel, db1.index.find, 6)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 280, in map_async
    iterable = list(iterable)
TypeError: 'instancemethod' object is not iterable

Я имею в виду, почему многопроцессорная система пытается преобразовать что-то в список?Разве это не источник проблемы?

из трассировки стека:

brk(0x231ccf000)                        = 0x231ccf000
futex(0x1abb150, FUTEX_WAKE_PRIVATE, 1) = 1
sendto(3, "+\0\0\0\260\263\355\356\0\0\0\0\325\7\0\0\0\0\0\0data.index\0\0"..., 43, 0, NULL, 0) = 43
recvfrom(3, "Some text from my database."..., 491663, 0, NULL, NULL) = 491663
... [manymany times]
brk(0x2320d5000)                        = 0x2320d5000
.... manymany times

Приведенный выше пример отправляется и отправляется в выводе strace, и по какой-то причине strace -o logfile python myscript.py не останавливается.Он просто ест все доступные оперативной памяти и записывает в файл журнала.

ОБНОВЛЕНИЕ.Использование imap вместо карты решило мою проблему.

Ответы [ 2 ]

2 голосов
/ 29 февраля 2012

Поскольку операция find() возвращает курсор функции карты, а поскольку вы говорите, что она работает без проблем, когда вы делаете for item in db1.index.find(): create_barrel(item), похоже, что функция create_barrel в порядке.

Можете ли вы попытаться ограничить количество результатов, возвращаемых курсором, и посмотреть, поможет ли это?Я думаю, что синтаксис будет:

db1.index.find().limit(100)

Если бы вы могли попробовать это и посмотреть, поможет ли это, это может помочь выяснить причину проблемы.

EDIT1: Я думаю, что вы поступаете неправильно, используя функцию карты - я думаю, вы должны использовать map_reduce в драйвере питона mongo - таким образом, функция карты будет выполняться процессом mongod.

1 голос
/ 28 февраля 2012
Функция

map () передает элементы в виде фрагментов для данной функции.По умолчанию этот размер вычисляется следующим образом ( ссылка на источник ):

chunksize, extra = divmod(len(iterable), len(self._pool) * 4)

Это, вероятно, приводит к слишком большому размеру куска в вашем случае и позволяет процессу исчерпать память.Попробуйте установить размер чанка вручную следующим образом:

my_pool.map(foo, db1.index.find(), 100)

РЕДАКТИРОВАТЬ: Вы также должны рассмотреть возможность повторного использования соединения БД и закрытия их после использования.Теперь вы создаете новое соединение БД для каждого элемента и не вызываете close() для них.

EDIT2: также проверьте, попадает ли цикл while в бесконечный цикл (объяснил бы симптомы).

EDIT3: На основе добавленной вами обратной трассировки функция карты пытается преобразовать курсорв список, вызывая выборку всех элементов одновременно.Это происходит потому, что он хочет найти количество предметов в наборе.Это часть map() кода из pool.py :

if not hasattr(iterable, '__len__'):
    iterable = list(iterable)

Вы можете попробовать это, чтобы избежать преобразования в список:

cursor = db1.index.find()
cursor.__len__ = cursor.count()
my_pool.map(foo, cursor)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...