Процессы не параллельны - PullRequest
0 голосов
/ 23 сентября 2019

Я работаю над инструментом классификации с использованием дампов dbpedia и столкнулся с проблемой производительности (несколько дней обработки).Я хочу классифицировать каждую статью в Википедии - короче говоря, я объединяю статью с похожими статьями и объединяю результаты нескольких классификаторов.Я хотел использовать свой компьютер (i7-6700K) и запустить программу на нескольких ядрах / процессах, но я не могу заставить ее работать.Я закончил с несколькими процессами, но одновременно запущен только один.

Я использую ubuntu as и windows subsystem.

диспетчер задач

Входной файл выглядит следующим образом:

# started 2016-06-16T01:23:53Z
<http://dbpedia.org/resource/Achilles> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#Thing> .
<http://dbpedia.org/resource/An_American_in_Paris> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#Thing> .
<http://dbpedia.org/resource/Actrius> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://dbpedia.org/ontology/Film> .
<http://dbpedia.org/resource/Animalia_(book)> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://dbpedia.org/ontology/Book> .
<http://dbpedia.org/resource/Agricultural_science> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#Thing> .
<http://dbpedia.org/resource/Alain_Connes> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://dbpedia.org/ontology/Scientist> .
<http://dbpedia.org/resource/Allan_Dwan> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://dbpedia.org/ontology/Person> .

Вот мой код:

def processWrapper(self, resourceInput, chunkStart, chunkSize):
    results = []
    with open(resourceInput, 'rb') as f:
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        logger.info(lines)
        for line in lines:
            if line.strip().startswith(b"#"):
                continue

            x = line.strip().split(b" ")
            logger.info(line.strip())
            resource = x[0][1:-1].decode("utf-8")
            if regSearch(r"__\d+", resource):
                continue
            logger.info(resource)
            result = self.resultTresholding(self.aggregateClassfierResults(resource), self.tresholdStrategy, self.threshold)
            finalType = self.finalTypeSelection(self.finalTypeSelectionStrategy, self.tresholdStrategy, result)
            results.append(tuple(result, finalType))
        return results

def chunkify(self, fname, size=1024*1024):
    fileEnd = os.path.getsize(fname)
    print(fileEnd)
    with open(fname, 'rb') as f:
        chunkEnd = f.tell()
        while True:
            chunkStart = chunkEnd
            f.seek(size, 1)
            f.readline()
            chunkEnd = f.tell()
            yield chunkStart, chunkEnd - chunkStart
            if chunkEnd > fileEnd:
                break

def processResources(self, resourceInput):
    '''

    '''
    pool = mp.Pool(4)
    # graph = rdflib.Graph()

    jobs = []

    # create jobs
    for chunkStart, chunkSize in self.chunkify(str(resourceInput), 100):
        logger.info(f"chunkStart - {chunkStart}")
        logger.info(f"chunkSize - {chunkSize}")
        jobs.append(pool.apply_async(self.processWrapper, (resourceInput, chunkStart, chunkSize)))

    for job in jobs:
        for result in job.get():
            with open(s.resultFile, 'a') as g:
                g.write(f"<{result[0]}> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <{result[1]}>  .\n")
    pool.close()

Чего мне не хватает?Я впервые использую multiprocessing ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...