У меня есть процесс, который выполняет следующие действия в Python 3.6.3: 1. Чтение данных из Oracle с помощью cx_Oracle (7.x) 2. Я использую большой размер чанка в курсоре, то есть 80000 строк.3. При чтении из курсора блоками, используя apply_async с пулом из 32 процессов, каждый процесс записывает в отдельный файл.4. Функция, вызванная вызовом apply_async, получает данные из очереди, открывает новый файл, записывает данные в файл и закрывает его.
Я всегда вижу случайную потерю данных в последнем выходе.Обычно это 80К + строк.Как будто один из процессов не записывает данные в файл, или некоторые процессы записывают меньше данных в свой отдельный файл.Я проверил, что я читаю правильное количество строк из базы данных, используя команду len (строки), чтобы убедиться, что все данные прочитаны перед записью в файл.
Код выглядит примерно так:
from __future__ import print_function
import cx_Oracle
import csv
import time
import datetime
from multiprocessing import Pool
from multiprocessing import Queue
pool = cx_Oracle.SessionPool(....., threaded = True)
queue1 = Queue()
def WriteRowsToFile(fname):
rows=q.get()
csvf = open(flname, 'w')
csv_writer = csv.writer(csvf)
csv_writer.writerows(rows)
csv_writer.flush()
csvf.close()
return 0
def TheBigQuery():
ctr = 0
totalrows = 0
fname='TestFile.txt'
ppool = Pool(processes=32)
conn = pool.acquire()
cursor = conn.cursor()
cursor.arraysize = 80000
cursor.execute("""
SELECT * FROM TEMP_EMP
""")
while True:
rows = cursor.fetchmany()
totalrows = totalrows + cursor.rowcount
if not rows:
break;
q.put(rows)
ctr = ctr + 1
fname = 'Testfile'+str(ctr)+'.txt'
result = ppool.apply_async(WriteRowsToFile,(fname,))
print("Total number of rows written : ")
print(totalrows)
ppool.close()
ppool.terminate()
return
TheBigQuery()
Я немного удивлен, увидев потерю данных в файлах, потому что каждый процесс записывает в свой файл каждый раз, когда он вызывается.Если процесс apply_async каким-либо образом не назначит задачу процессу, который занят записью в файл, и в этот период он не записывает новые данные.Как отладить это и исправить проблему?
Советы / указатели будут высоко ценится.Большое спасибо.