Короче, я пишу скребок для сбора предметов с определенного сайта (более 200 000 предметов). Они разделены на 8 различных разделов, и каждый раздел содержит более 200 страниц со 100 ссылками на страницу (каждая ссылка открывает страницу элемента - сочная информация, которую мне нужно собрать). Без многопоточности или многопроцессорности это займет около 10 дней. Моей первоначальной идеей было распространение 100 ссылок на страницу через threadpool
. Это отлично работало, но я все еще последовательно просматривал 8 различных разделов и решил, что хочу использовать процессор. Поэтому я подумал о том, чтобы разделить 8 секций на 8 различных процессов с помощью processpool
. Все пошло не так.
В конце мне нужно записать все данные, которые возвращаются из каждого элемента, в один массивный CSV-файл. Я использую питонов csv.DictWriter
и пишу каждые 10 000 штук или около того.
Поток такой: Distribute 8 different processes to handle each section, pass them the function to use and the writer object -->
Each process has its own queue on a separate thread, and it waits for data -->
Distribute 100 links across a threadpool, each result that comes back gets pushed into the process's corresponding queue -->
once one of the queue passes 10,000 items, it locks the csv file (temporarily) and writes its data.
Код проблематики c выглядит примерно так:
def get_all_items(section_num, writer_obj):
print("Hey I'm in here")
write_que = queue.Queue(1000)
worker = Thread(target=wait_and_write, args=(write_que,), daemon=True)
worker.start()
# blah blah I'm getting all the data and pushing it to the queue and writing somewhere else
def distribute_items(func, writer_obj):
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
futures = {executor.submit(func, i, writer_obj):i for i in range(1, 9)}
for future in concurrent.futures.as_completed(futures):
logging.info(f"Section #{futures[future]} is done")
if __name__ == '__main__':
logging.info("Program started.")
f = open('tmp.csv', 'w+', newline='')
w = csv.DictWriter(f, constants.CSV_HEADER)
w.writeheader()
distribute_items(get_all_items, w)
logging.info("Finished.")
f.close()
Это не работает, по какой-то причине, которую я не могу понять. Вывод:
Program started.
Section #1 is done
Section #2 is done
Section #3 is done
Section #4 is done
Section #5 is done
Section #6 is done
Section #7 is done
Section #8 is done
Finished.
executor.submit
никогда не вызывает функцию func
. После большого количества испытаний она не вызывала функцию из-за аргумента writer_obj
. Я создал функцию тестирования:
def testing(the_arg):
# if the_arg is a number, dictionary, anything else - this function gets called
print(the_arg)
futures = [executor.submit(testing, i) for i in range(1, 9)]
""" If I use i as the parameter (which in this case is a number), the function gets called. If I pass in
writer_obj, the function never gets called. """
Что-то о том, что я передаю объект записи в качестве параметра, все портит, и я нигде не могу найти объяснения этому.
TL; DR По какой-то причине python
игнорирует вызов функции с исполнителем ProcessPool
, если я передаю объект записи в качестве параметра функции. Все остальные типы параметров работают (или те, которые я проверял).
Кто-нибудь знает почему?