Странное поведение с аргументами executor.submit - PullRequest
0 голосов
/ 28 марта 2020

Короче, я пишу скребок для сбора предметов с определенного сайта (более 200 000 предметов). Они разделены на 8 различных разделов, и каждый раздел содержит более 200 страниц со 100 ссылками на страницу (каждая ссылка открывает страницу элемента - сочная информация, которую мне нужно собрать). Без многопоточности или многопроцессорности это займет около 10 дней. Моей первоначальной идеей было распространение 100 ссылок на страницу через threadpool. Это отлично работало, но я все еще последовательно просматривал 8 различных разделов и решил, что хочу использовать процессор. Поэтому я подумал о том, чтобы разделить 8 секций на 8 различных процессов с помощью processpool. Все пошло не так.

В конце мне нужно записать все данные, которые возвращаются из каждого элемента, в один массивный CSV-файл. Я использую питонов csv.DictWriter и пишу каждые 10 000 штук или около того.

Поток такой: Distribute 8 different processes to handle each section, pass them the function to use and the writer object --> Each process has its own queue on a separate thread, and it waits for data --> Distribute 100 links across a threadpool, each result that comes back gets pushed into the process's corresponding queue --> once one of the queue passes 10,000 items, it locks the csv file (temporarily) and writes its data.

Код проблематики c выглядит примерно так:

def get_all_items(section_num, writer_obj):
   print("Hey I'm in here")
   write_que = queue.Queue(1000)
   worker = Thread(target=wait_and_write, args=(write_que,), daemon=True)
   worker.start()
   # blah blah I'm getting all the data and pushing it to the queue and writing somewhere else

def distribute_items(func, writer_obj):
    with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(func, i, writer_obj):i for i in range(1, 9)}
        for future in concurrent.futures.as_completed(futures):
            logging.info(f"Section #{futures[future]} is done")

if __name__ == '__main__':
    logging.info("Program started.")
    f = open('tmp.csv', 'w+', newline='')
    w = csv.DictWriter(f, constants.CSV_HEADER)
    w.writeheader()
    distribute_items(get_all_items, w)
    logging.info("Finished.")
    f.close()

Это не работает, по какой-то причине, которую я не могу понять. Вывод:

Program started.
Section #1 is done
Section #2 is done
Section #3 is done
Section #4 is done
Section #5 is done
Section #6 is done
Section #7 is done
Section #8 is done
Finished.

executor.submit никогда не вызывает функцию func. После большого количества испытаний она не вызывала функцию из-за аргумента writer_obj. Я создал функцию тестирования:

def testing(the_arg):
    # if the_arg is a number, dictionary, anything else - this function gets called
    print(the_arg)

futures = [executor.submit(testing, i) for i in range(1, 9)] 
""" If I use i as the parameter (which in this case is a number), the function gets called. If I pass in 
writer_obj, the function never gets called. """

Что-то о том, что я передаю объект записи в качестве параметра, все портит, и я нигде не могу найти объяснения этому.

TL; DR По какой-то причине python игнорирует вызов функции с исполнителем ProcessPool, если я передаю объект записи в качестве параметра функции. Все остальные типы параметров работают (или те, которые я проверял).

Кто-нибудь знает почему?

...