Как использовать многопроцессорность в csv.DictReader? - PullRequest
0 голосов
/ 12 февраля 2019

Это скрипт для расчета гистограммы, и я считаю, что lib csv.py занимает больше всего времени.Как я могу запустить его параллельно?

Размер входного файла samtools.depth.gz равен 14 ГБ, содержит около 3 миллиардов строк.

SamplesList = ('Sample_A', 'Sample_B', 'Sample_C', 'Sample_D')
from collections import Counter
cDepthCnt = {key:Counter() for key in SamplesList}
cDepthStat = {key:[0,0] for key in SamplesList} # x and x^2

RecordCnt,MaxDepth = inStat('samtools.depth.gz')
print('xxx')

def inStat(inDepthFile):
    import gzip
    import csv
    RecordCnt = 0
    MaxDepth = 0
    with gzip.open(inDepthFile, 'rt') as tsvfin:
        tsvin = csv.DictReader(tsvfin, delimiter='\t', fieldnames=('ChrID','Pos')+SamplesList )
        RecordCnt += 1
        for row in tsvin:
            for k in SamplesList:
                theValue = int(row[k])
                if theValue > MaxDepth:
                    MaxDepth = theValue
                cDepthCnt[k][theValue] += 1
                cDepthStat[k][0] += theValue
                cDepthStat[k][1] += theValue * theValue
    return RecordCnt,MaxDepth

cProfile

Существуют способы считывания огромных файлов на куски и распределения их со списком, например https://stackoverflow.com/a/30294434/159695:

bufsize = 65536
with open(path) as infile: 
    while True:
        lines = infile.readlines(bufsize)
        if not lines:
            break
        for line in lines:
            process(line)

Однако csv.DictReader принимает только дескрипторы файлов.

Есть способ разбить временные файлы на https://gist.github.com/jbylund/c37402573a896e5b5fc8, интересно, смогу ли я использовать fifo, чтобы сделать это на лету.


Я просто нахожу csv.DictReaderпринимает любой объект, который поддерживает протокол итератора, и возвращает строку каждый раз, когда вызывается его метод next () - подходят файловые объекты и объекты списка.

Я должен изменить inStat() напринять строки.Не могли бы вы помочь мне завершить statPool()?

def statPool(inDepthFile):
    import gzip
    RecordCnt = 0
    MaxDepth = 0
    cDepthCnt = {key:Counter() for key in SamplesList}
    cDepthStat = {key:[0,0,0,0,0] for key in SamplesList} # x and x^2
    with gzip.open(inDepthFile, 'rt') as tsvfin:
        while True:
            lines = tsvfin.readlines(ChunkSize)
            if not lines:
                break
            with Pool(processes=4) as pool:
                res = pool.apply_async(inStat,[lines])
                iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat = res.get()
            RecordCnt += iRecordCnt
            if iMaxDepth > MaxDepth:
                MaxDepth = iMaxDepth
            for k in SamplesList:
                cDepthCnt[k].update(icDepthCnt[k])
                cDepthStat[k][0] += icDepthStat[k][0]
                cDepthStat[k][1] += icDepthStat[k][1]
    return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

Я думаю, asyncio.Queue, кажется, будет хорошим способом передачи нескольких csv.DictReader рабочих.

1 Ответ

0 голосов
/ 12 февраля 2019

Поиск в глобальном контексте занимает больше времени, чем поиск в локальном.

Вы делаете лот поисков - я предлагаю изменить ваш код на:

cDepthCnt = {key:Counter() for key in SamplesList}
cDepthStat = {key:[0,0] for key in SamplesList} # x and x^2

RecordCnt,MaxDepth = inStat('samtools.depth.gz', cDepthCnt, cDepthStat)
print('xxx')

def inStat(inDepthFile, depthCount, depthStat):
    # use the local depthCount, depthStat

чтобы ускорить эту часть на несколько процентов.

Параллельное выполнение при доступе к одним и тем же ключам снова и снова приведет к блокировке этих значений во избежание ошибок - блокировка / разблокировка также требует времени.Вы должны увидеть, если это быстрее.

Все, что вы делаете, это суммирование значений - вы можете разделить ваши данные и использовать 4 части для 4 (раз 2) различных словарей, а затем сложить 4 слова вваш глобальный, чтобы избежать блокировок.

...