Это скрипт для расчета гистограммы, и я считаю, что lib csv.py
занимает больше всего времени.Как я могу запустить его параллельно?
Размер входного файла samtools.depth.gz
равен 14 ГБ, содержит около 3 миллиардов строк.
SamplesList = ('Sample_A', 'Sample_B', 'Sample_C', 'Sample_D')
from collections import Counter
cDepthCnt = {key:Counter() for key in SamplesList}
cDepthStat = {key:[0,0] for key in SamplesList} # x and x^2
RecordCnt,MaxDepth = inStat('samtools.depth.gz')
print('xxx')
def inStat(inDepthFile):
import gzip
import csv
RecordCnt = 0
MaxDepth = 0
with gzip.open(inDepthFile, 'rt') as tsvfin:
tsvin = csv.DictReader(tsvfin, delimiter='\t', fieldnames=('ChrID','Pos')+SamplesList )
RecordCnt += 1
for row in tsvin:
for k in SamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
cDepthCnt[k][theValue] += 1
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
return RecordCnt,MaxDepth
Существуют способы считывания огромных файлов на куски и распределения их со списком, например https://stackoverflow.com/a/30294434/159695:
bufsize = 65536
with open(path) as infile:
while True:
lines = infile.readlines(bufsize)
if not lines:
break
for line in lines:
process(line)
Однако csv.DictReader
принимает только дескрипторы файлов.
Есть способ разбить временные файлы на https://gist.github.com/jbylund/c37402573a896e5b5fc8, интересно, смогу ли я использовать fifo, чтобы сделать это на лету.
Я просто нахожу csv.DictReader
принимает любой объект, который поддерживает протокол итератора, и возвращает строку каждый раз, когда вызывается его метод next () - подходят файловые объекты и объекты списка.
Я должен изменить inStat()
напринять строки.Не могли бы вы помочь мне завершить statPool()
?
def statPool(inDepthFile):
import gzip
RecordCnt = 0
MaxDepth = 0
cDepthCnt = {key:Counter() for key in SamplesList}
cDepthStat = {key:[0,0,0,0,0] for key in SamplesList} # x and x^2
with gzip.open(inDepthFile, 'rt') as tsvfin:
while True:
lines = tsvfin.readlines(ChunkSize)
if not lines:
break
with Pool(processes=4) as pool:
res = pool.apply_async(inStat,[lines])
iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat = res.get()
RecordCnt += iRecordCnt
if iMaxDepth > MaxDepth:
MaxDepth = iMaxDepth
for k in SamplesList:
cDepthCnt[k].update(icDepthCnt[k])
cDepthStat[k][0] += icDepthStat[k][0]
cDepthStat[k][1] += icDepthStat[k][1]
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
Я думаю, asyncio.Queue
, кажется, будет хорошим способом передачи нескольких csv.DictReader
рабочих.