multiprocessing.pool.map и функция с двумя аргументами - PullRequest
10 голосов
/ 15 декабря 2011

Я использую multiprocessing.Pool()

Вот что я хочу для пула:

def insert_and_process(file_to_process,db):
    db = DAL("path_to_mysql" + db)
    #Table Definations
    db.table.insert(**parse_file(file_to_process))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)
    P.map(insert_and_process,file_list,db) # here having problem.

Я хочу передать 2 аргумента. Я хочу инициализировать только 4 соединения с БД.(здесь мы попытаемся создать соединение при каждом вызове функции, так что, возможно, миллионы из них вызовут IO Freezed to death).если я смогу создать 4 дБ соединения и 1 для каждого процесса, все будет в порядке.

Есть ли какое-нибудь решение для пула?или я должен отказаться от этого?

РЕДАКТИРОВАТЬ:

С помощью вас обоих я получил это, сделав это:

args=zip(f,cycle(dbs))
Out[-]: 
[('f1', 'db1'),
 ('f2', 'db2'),
 ('f3', 'db3'),
 ('f4', 'db4'),
 ('f5', 'db1'),
 ('f6', 'db2'),
 ('f7', 'db3'),
 ('f8', 'db4'),
 ('f9', 'db1'),
 ('f10', 'db2'),
 ('f11', 'db3'),
 ('f12', 'db4')]

Так вот как это будет работать,я собираюсь переместить код соединения с БД на основной уровень и сделать это:

def process_and_insert(args):

    #Table Definations
    args[1].table.insert(**parse_file(args[0]))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)

    dbs = [DAL("path_to_mysql/database") for i in range(0,3)]
    args=zip(file_list,cycle(dbs))
    P.map(insert_and_process,args) # here having problem.

Да, я собираюсь проверить его и сообщить вам, ребята.

Ответы [ 5 ]

26 голосов
/ 15 декабря 2011

Документация Pool не говорит о способе передачи более одного параметра в целевую функцию - я пробовал просто передавать последовательность, но не раскрывается (один элемент последовательности для каждого параметра).

Однако вы можете написать свою целевую функцию, чтобы ожидать, что первый (и единственный) параметр будет кортежем, в котором каждый элемент является одним из ожидаемых вами параметров:

from itertools import repeat

def insert_and_process((file_to_process,db)):
    db = DAL("path_to_mysql" + db)
    #Table Definations
    db.table.insert(**parse_file(file_to_process))
    return True

if __name__=="__main__":
    file_list=os.listdir(".")
    P = Pool(processes=4)
    P.map(insert_and_process,zip(file_list,repeat(db))) 

(обратите внимание на дополнительные скобки в определении insert_and_process - python рассматривает это как один параметр, который должен быть последовательностью из 2 элементов. Первый элемент последовательности относится к первой переменной, а другой к второй)

8 голосов
/ 15 декабря 2011

Ваш пул породит четыре процесса, каждый из которых запускается собственным экземпляром интерпретатора Python. Вы можете использовать глобальную переменную для хранения объекта соединения с базой данных, так что для каждого процесса создается ровно одно соединение:

global_db = None

def insert_and_process(file_to_process, db):
    global global_db
    if global_db is None:
        # If this is the first time this function is called within this
        # process, create a new connection.  Otherwise, the global variable
        # already holds a connection established by a former call.
        global_db = DAL("path_to_mysql" + db)
    global_db.table.insert(**parse_file(file_to_process))
    return True

Так как Pool.map() и друзья поддерживают только рабочие функции с одним аргументом, вам нужно создать оболочку, которая перенаправляет работу:

def insert_and_process_helper(args):
    return insert_and_process(*args)

if __name__ == "__main__":
    file_list=os.listdir(".")
    db = "wherever you get your db"
    # Create argument tuples for each function call:
    jobs = [(file, db) for file in file_list]
    P = Pool(processes=4)
    P.map(insert_and_process_helper, jobs)
5 голосов
/ 09 сентября 2012

Не нужно использовать zip.Если, например, у вас есть 2 параметра, x и y, и каждый из них может получить несколько значений, например:

X=range(1,6)
Y=range(10)

Функция должна получить только один параметр и распаковать его внутри:*

И вы называете это так:

params = [(x,y) for x in X for y in Y]
pool.map(func, params)
2 голосов
/ 13 июня 2016

Для этой цели вы можете использовать библиотеку

from functools import partial 

, например

func = partial(rdc, lat, lng)
r = pool.map(func, range(8))

и

def rdc(lat,lng,x):
    pass 
2 голосов
/ 19 сентября 2012

Используя

params=[(x,y) for x in X for y in Y]

, вы создаете полную копию x и y, и это может быть медленнее, чем при использовании

from itertools import repeat
P.map(insert_and_process,zip(file_list,repeat(db)))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...