У меня есть pandas фрейм данных, поле которого мне нужно проверить на наличие бессмысленных слов с помощью Wiktionary API. Имея огромное количество слов в фреймворке данных и учитывая 0,6 секунды, которые требуется для запроса GET, я решил использовать многопроцессорность для ускорения процесса.
Я разделил свой фрейм данных на части, используя следующий код:
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i:i + n]
dataChunks = list(chunks(data, 5292))
Следующий код отвечает за проверку наличия каждого слова в поле UserInput согласно Wiktionary и запись значимых записей в файл CSV:
def check_dict(chunk):
for index, row in chunk.iterrows():
query = str(row['UserInput'])
# Dictionary Check
re.sub(r"[^a-zA-Z0-9]+", ' ', query)
words = str(query).split()
for word in words:
response = requests.get('https://en.wiktionary.org/w/api.php?action=query&titles=' + word.lower() + '&format=json')
resData = response.json()
if list(resData['query']['pages'].keys())[0] != '-1':
file = open('logsAllDict2.csv', 'a', newline='')
with file:
writer = csv.writer(file)
writer.writerow(row)
И я использовал следующее код для создания различных процессов для обработки кусков:
jobs = []
for i in dataChunks:
j = Process(target=check_dict, args=(i,))
jobs.append(j)
for j in jobs:
print('starting job {}'.format(str(j)))
j.start()
Это моя первая попытка многопроцессорности, и я здесь совершенно запутался. Кажется, что процессы записывают одни и те же данные в файл CSV, в некоторых случаях даже 10 раз. Любая помощь будет высоко ценится. Если есть какой-то лучший способ go об этой проблеме, я все слышу.
Изменить: даже если я использую Pool, например:
if __name__ == '__main__':
with Pool(7) as p:
result = p.map(check_dict, dataChunks)
pool.close()
pool.join()
, у меня такая же проблема. Я уверен, что что-то делаю не так, просто не знаю что.