Мультипроцессор застрял в функции карты - PullRequest
1 голос
/ 27 апреля 2019

В настоящее время я делаю http-запрос, содержащий более 3 000 000 записей, используя метод нумерации страниц.Иногда происходит сбой вызова из-за ошибки сервера 104, поэтому я повторяю попытку, и она работает во второй или третий раз.

Поскольку запросов так много, я использую многопроцессорную функцию в python для ускорения этого процесса.Я использую машину с Ubuntu 16, python3.5 и 8-ядерную машину.Странно то, что все файлы записываются, и процесс «Завершает», то есть достигает конца диапазона (независимо от размера, 1 млн., 2 млн. Или 3 млн.), Но не пересекает строку пула.Так что мои сеансы tmux просто говорят: «Работа над датой (lastrecordnumber)», мне нужно, чтобы это произошло, чтобы я мог отправить электронное письмо, чтобы сообщить мне, что задача выполнена.

Я пробовал pool.map();pool.aysnc();pool.map_async(), похоже, что все они имеют одну и ту же проблему.

import http.client
from multiprocessing import Pool
from functools import partial


def get_raw_data(auth, url_conn, skip):
    headers = {'authorization': "Basic {}".format(auth)}
    sucess = None
    loop = 0
    while not sucess:
      try:
        conn = http.client.HTTPSConnection(url_conn)
        conn.request("GET", "SOME_API&$skip={}".format(skip), headers=headers)
        res = conn.getresponse()
        data = res.read()
        raw_data = json.loads(data.decode("utf-8"))
        sucess = 'yes'
      except Exception as e:
        print('stuck in loop {} {} {}'.format(skip, loop, e))
        loop += 1

    with open('{}.json'.format(skip), 'w') as outfile:
          json.dump(raw_data, outfile)

 def process_skips(skip):            
    print('Working on date {}'.format(skip))    
    get_raw_data(skip)

if __name__ == '__main__':
    print("We started at {}".format(dt.datetime.now()))
    n = range(0,3597351,5000)
    n = list(n)  
    pool = Pool(8)
    pool.map_async(process_skips, n)
    pool.close()
    pool.join()

1 Ответ

0 голосов
/ 27 апреля 2019

Использование пула в качестве менеджера контекста с использованием with, который заботится о закрытии / присоединении к процессам и кажется предпочтительным методом в документах.

if __name__ == '__main__':
    print("We started at {}".format(dt.datetime.now()))
    n = list(range(0,3597351,5000))
    with Pool(8) as pool:
        pool.map_async(process_skips, n)

Если ваш основной процесс работает и пишетваш файл правильно, это должно заставить ваши процессы закрываться правильно.

...