1 - ПРОБЛЕМА
Я использую "spacy" на python для лемматизации текстовых документов.Существует 500 000 документов размером до 20 МБ чистого текста.
Проблема заключается в следующем: потребление пространственной памяти увеличивается со временем до использования всей памяти.
2 - ОБЩАЯ ИНФОРМАЦИЯ
Моя конфигурация оборудования: ЦП: Intel I7-8700K 3,7 ГГц (12 ядер) Память: 16 ГБ SSD: 1 ТБ GPU встроен, но не используется для этой задачи
Я использую "мультипроцессорность »для разделения задачи на несколько процессов (рабочих).Каждый работник получает список документов для обработки.Основной процесс осуществляет мониторинг дочерних процессов.Я запускаю «spacy» в каждом дочернем процессе один раз и использую этот один экземпляр spacy для обработки всего списка документов в работнике.
Трассировка памяти говорит следующее:
[Memorytrace - Top 10]
/ opt /velop / virtualenv / lib / python3.6 / site-packages / thinc / neural / mem.py: 68: размер = 45,1 МиБ, количество = 99, среднее = 467KiB
/ opt /velop / virtualenv / lib / python3.6 / posixpath.py: 149: размер = 40,3 МиБ, число = 694225, среднее = 61 B
: 487: размер =9550 КиБ, число = 77746, среднее = 126 В
/ opt /velop / virtualenv / lib / python3.6 / site-packages / dawg_python / wrapper.py: 33: размер = 7901 КиБ, количество = 6, среднее значение = 1317 КиБ
/ opt / development / virtualenv / lib / python3.6 / site-packages / spacy / lang / en / lemmatizer / _nouns.py: 7114: размер = 5273 КиБ, количество = 57494, среднее = 94 B
prepare_docs04.py: 372: размер = 4189 КиБ, число = 1, среднее = 4189 КиБ
/ opt / development / virtualenv / lib / python3.6 / site-packages / dawg_python / wrapper.py: 93: размер = 3949 КиБ, количество = 5, среднее = 790 КиБ
/ usr / lib / python3.6 / json / decoder.py: 355: размер = 1837 КиБ, число = 20456, среднее = 92 B
/ opt / development / virtualenv/lib/python3.6/site-packages/spacy/lang/en/lemmatizer/_adjectives.py:2828: размер = 1704 КиБ, число = 20976, среднее = 83 B
prepare_docs04.py: 373:размер = 1633 КБ, число = 1, среднее = 1633 КБ
3 - ОЖИДАНИЯ
Я видел хорошую рекомендацию для создания отдельного решения сервер-клиент [здесь] Можно ли сохранить объем памяти в памяти, чтобы сократить время загрузки?
Можно ли контролировать потребление памяти с помощью подхода "многопроцессорной обработки"?
4 - КОД
Вот упрощенная версия моего кода:
import os, subprocess, spacy, sys, tracemalloc
from multiprocessing import Pipe, Process, Lock
from time import sleep
# START: memory trace
tracemalloc.start()
# Load spacy
spacyMorph = spacy.load("en_core_web_sm")
#
# Get word's lemma
#
def getLemma(word):
global spacyMorph
lemmaOutput = spacyMorph(str(word))
return lemmaOutput
#
# Worker's logic
#
def workerNormalize(lock, conn, params):
documentCount = 1
for filenameRaw in params[1]:
documentTotal = len(params[1])
documentID = int(os.path.basename(filenameRaw).split('.')[0])
# Send to the main process the worker's current progress
if not lock is None:
lock.acquire()
try:
statusMessage = "WORKING:{:d},{:d},".format(documentID, documentCount)
conn.send(statusMessage)
documentCount += 1
finally:
lock.release()
else:
print(statusMessage)
# ----------------
# Some code is excluded for clarity sake
# I've got a "wordList" from file "filenameRaw"
# ----------------
wordCount = 1
wordTotalCount = len(wordList)
for word in wordList:
lemma = getLemma(word)
wordCount += 1
# ----------------
# Then I collect all lemmas and save it to another text file
# ----------------
# Here I'm trying to reduce memory usage
del wordList
del word
gc.collect()
if __name__ == '__main__':
lock = Lock()
processList = []
# ----------------
# Some code is excluded for clarity sake
# Here I'm getting full list of files "fileTotalList" which I need to lemmatize
# ----------------
while cursorEnd < (docTotalCount + stepSize):
fileList = fileTotalList[cursorStart:cursorEnd]
# ----------------
# Create workers and populate it with list of files to process
# ----------------
processData = {}
processData['total'] = len(fileList) # worker total progress
processData['count'] = 0 # worker documents done count
processData['currentDocID'] = 0 # current document ID the worker is working on
processData['comment'] = '' # additional comment (optional)
processData['con_parent'], processData['con_child'] = Pipe(duplex=False)
processName = 'worker ' + str(count) + " at " + str(cursorStart)
processData['handler'] = Process(target=workerNormalize, name=processName, args=(lock, processData['con_child'], [processName, fileList]))
processList.append(processData)
processData['handler'].start()
cursorStart = cursorEnd
cursorEnd += stepSize
count += 1
# ----------------
# Run the monitor to look after the workers
# ----------------
while True:
runningCount = 0
#Worker communication format:
#STATUS:COMMENTS
#STATUS:
#- WORKING - worker is working
#- CLOSED - worker has finished his job and closed pipe-connection
#COMMENTS:
#- for WORKING status:
#DOCID,COUNT,COMMENTS
#DOCID - current document ID the worker is working on
#COUNT - count of done documents
#COMMENTS - additional comments (optional)
# ----------------
# Run through the list of workers ...
# ----------------
for i, process in enumerate(processList):
if process['handler'].is_alive():
runningCount += 1
# ----------------
# .. and check if there is somethng in the PIPE
# ----------------
if process['con_parent'].poll():
try:
message = process['con_parent'].recv()
status = message.split(':')[0]
comment = message.split(':')[1]
# ----------------
# Some code is excluded for clarity sake
# Update worker's information and progress in "processList"
# ----------------
except EOFError:
print("EOF----")
# ----------------
# Some code is excluded for clarity sake
# Here I draw some progress lines per workers
# ----------------
else:
# worker has finished his job. Close the connection.
process['con_parent'].close()
# Whait for some time and monitor again
sleep(PARAM['MONITOR_REFRESH_FREQUENCY'])
print("================")
print("**** DONE ! ****")
print("================")
# ----------------
# Here I'm measuring memory usage to find the most "gluttonous" part of the code
# ----------------
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
print("[ Memory trace - Top 10 ]")
for stat in top_stats[:10]:
print(stat)
'''