Python Многопроцессорность - Процессы, повторно обрабатывающие одни и те же данные. - PullRequest
0 голосов
/ 25 мая 2020

У меня есть pandas фрейм данных, поле которого мне нужно проверить на наличие бессмысленных слов с помощью Wiktionary API. Имея огромное количество слов в фреймворке данных и учитывая 0,6 секунды, которые требуется для запроса GET, я решил использовать многопроцессорность для ускорения процесса.

Я разделил свой фрейм данных на части, используя следующий код:

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

dataChunks = list(chunks(data, 5292))

Следующий код отвечает за проверку наличия каждого слова в поле UserInput согласно Wiktionary и запись значимых записей в файл CSV:

def check_dict(chunk):
    for index, row in chunk.iterrows(): 
        query = str(row['UserInput'])

        # Dictionary Check
        re.sub(r"[^a-zA-Z0-9]+", ' ', query)
        words = str(query).split()

        for word in words:
            response = requests.get('https://en.wiktionary.org/w/api.php?action=query&titles=' + word.lower() + '&format=json')
            resData = response.json()

            if list(resData['query']['pages'].keys())[0] != '-1':
                file = open('logsAllDict2.csv', 'a', newline='')
                with file:
                    writer = csv.writer(file)
                    writer.writerow(row)

И я использовал следующее код для создания различных процессов для обработки кусков:

jobs = []
for i in dataChunks:
    j = Process(target=check_dict, args=(i,))
    jobs.append(j)
for j in jobs:
    print('starting job {}'.format(str(j)))        
    j.start()

Это моя первая попытка многопроцессорности, и я здесь совершенно запутался. Кажется, что процессы записывают одни и те же данные в файл CSV, в некоторых случаях даже 10 раз. Любая помощь будет высоко ценится. Если есть какой-то лучший способ go об этой проблеме, я все слышу.

Изменить: даже если я использую Pool, например:

if __name__ == '__main__':
    with Pool(7) as p:
        result = p.map(check_dict, dataChunks) 
        pool.close()
        pool.join()

, у меня такая же проблема. Я уверен, что что-то делаю не так, просто не знаю что.

...