Я работаю над инструментом классификации с использованием дампов dbpedia и столкнулся с проблемой производительности (несколько дней обработки).Я хочу классифицировать каждую статью в Википедии - короче говоря, я объединяю статью с похожими статьями и объединяю результаты нескольких классификаторов.Я хотел использовать свой компьютер (i7-6700K) и запустить программу на нескольких ядрах / процессах, но я не могу заставить ее работать.Я закончил с несколькими процессами, но одновременно запущен только один.
Я использую ubuntu as и windows subsystem.
диспетчер задач
Входной файл выглядит следующим образом:
# started 2016-06-16T01:23:53Z
<http://dbpedia.org/resource/Achilles> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#Thing> .
<http://dbpedia.org/resource/An_American_in_Paris> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#Thing> .
<http://dbpedia.org/resource/Actrius> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://dbpedia.org/ontology/Film> .
<http://dbpedia.org/resource/Animalia_(book)> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://dbpedia.org/ontology/Book> .
<http://dbpedia.org/resource/Agricultural_science> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#Thing> .
<http://dbpedia.org/resource/Alain_Connes> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://dbpedia.org/ontology/Scientist> .
<http://dbpedia.org/resource/Allan_Dwan> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://dbpedia.org/ontology/Person> .
Вот мой код:
def processWrapper(self, resourceInput, chunkStart, chunkSize):
results = []
with open(resourceInput, 'rb') as f:
f.seek(chunkStart)
lines = f.read(chunkSize).splitlines()
logger.info(lines)
for line in lines:
if line.strip().startswith(b"#"):
continue
x = line.strip().split(b" ")
logger.info(line.strip())
resource = x[0][1:-1].decode("utf-8")
if regSearch(r"__\d+", resource):
continue
logger.info(resource)
result = self.resultTresholding(self.aggregateClassfierResults(resource), self.tresholdStrategy, self.threshold)
finalType = self.finalTypeSelection(self.finalTypeSelectionStrategy, self.tresholdStrategy, result)
results.append(tuple(result, finalType))
return results
def chunkify(self, fname, size=1024*1024):
fileEnd = os.path.getsize(fname)
print(fileEnd)
with open(fname, 'rb') as f:
chunkEnd = f.tell()
while True:
chunkStart = chunkEnd
f.seek(size, 1)
f.readline()
chunkEnd = f.tell()
yield chunkStart, chunkEnd - chunkStart
if chunkEnd > fileEnd:
break
def processResources(self, resourceInput):
'''
'''
pool = mp.Pool(4)
# graph = rdflib.Graph()
jobs = []
# create jobs
for chunkStart, chunkSize in self.chunkify(str(resourceInput), 100):
logger.info(f"chunkStart - {chunkStart}")
logger.info(f"chunkSize - {chunkSize}")
jobs.append(pool.apply_async(self.processWrapper, (resourceInput, chunkStart, chunkSize)))
for job in jobs:
for result in job.get():
with open(s.resultFile, 'a') as g:
g.write(f"<{result[0]}> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <{result[1]}> .\n")
pool.close()
Чего мне не хватает?Я впервые использую multiprocessing
...