Потеря данных при записи в файлы асинхронными процессами (apply_async) - PullRequest
0 голосов
/ 21 октября 2018

У меня есть процесс, который выполняет следующие действия в Python 3.6.3: 1. Чтение данных из Oracle с помощью cx_Oracle (7.x) 2. Я использую большой размер чанка в курсоре, то есть 80000 строк.3. При чтении из курсора блоками, используя apply_async с пулом из 32 процессов, каждый процесс записывает в отдельный файл.4. Функция, вызванная вызовом apply_async, получает данные из очереди, открывает новый файл, записывает данные в файл и закрывает его.

Я всегда вижу случайную потерю данных в последнем выходе.Обычно это 80К + строк.Как будто один из процессов не записывает данные в файл, или некоторые процессы записывают меньше данных в свой отдельный файл.Я проверил, что я читаю правильное количество строк из базы данных, используя команду len (строки), чтобы убедиться, что все данные прочитаны перед записью в файл.

Код выглядит примерно так:

from __future__ import print_function

import cx_Oracle
import csv
import time
import datetime
from multiprocessing import Pool
from multiprocessing import Queue

pool = cx_Oracle.SessionPool(....., threaded = True)
queue1 = Queue()
def WriteRowsToFile(fname):
    rows=q.get()
    csvf = open(flname, 'w')
    csv_writer = csv.writer(csvf)
    csv_writer.writerows(rows)
    csv_writer.flush()
    csvf.close()
    return 0

def TheBigQuery():
    ctr = 0
    totalrows = 0
    fname='TestFile.txt'
    ppool = Pool(processes=32)
        conn = pool.acquire()
    cursor = conn.cursor()
    cursor.arraysize = 80000
       cursor.execute("""
      SELECT * FROM TEMP_EMP
                """)

    while True:
        rows = cursor.fetchmany()
        totalrows = totalrows + cursor.rowcount
        if not rows:
           break;
        q.put(rows)
        ctr = ctr + 1
        fname = 'Testfile'+str(ctr)+'.txt'

        result = ppool.apply_async(WriteRowsToFile,(fname,))     

    print("Total number of rows written : ")
    print(totalrows)

    ppool.close()
    ppool.terminate()
    return


TheBigQuery()

Я немного удивлен, увидев потерю данных в файлах, потому что каждый процесс записывает в свой файл каждый раз, когда он вызывается.Если процесс apply_async каким-либо образом не назначит задачу процессу, который занят записью в файл, и в этот период он не записывает новые данные.Как отладить это и исправить проблему?

Советы / указатели будут высоко ценится.Большое спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...