Многопроцессорная обработка большого набора данных в Python (поиск дубликатов) - PullRequest
0 голосов
/ 18 декабря 2018

У меня есть файл json, из которого я хочу удалить дублирующиеся строки, но он слишком велик, чтобы поместиться в память.Я нашел способ сделать это, но думаю, что это не самый лучший способ.

Моя проблема в том, что он работает за 8 минут для набора данных 12 ГБ.Но требование состоит в том, чтобы масштабировать код так, чтобы он мог работать на наборе данных объемом 100 ГБ.Любые указатели о том, как это сделать?Должен ли я использовать многопоточность или многопроцессорность в Python для достижения этой цели?Или любой другой метод?

Это код:

import json import time

"" "Этот запрос содержит бизнес-логику для определения дубликатов и создания выходного файла.для дальнейшей обработки "" "

класс BusinessService:

""" The method identiifes the duplicate """

def service(ipPath,opPath):

        start_time = time.time()    #We start the timer to see how much time the method takes to work #

        uniqueHandleSet = set();     #Creating a set to store unique values #

        try:
            duplicateHandles = open(opPath,'w+',encoding='utf-8')     #Opening and creating an output file to catch the duplicate hanndles #                     
            with open(ipPath,buffering = 200000000,encoding = 'utf-8') as infile:     #Reading the JSON File by buffering and using 20mb as it is too big to read at once #       


                for line in infile:

                    tweetJsonObject = json.loads(line);

                    if tweetJsonObject["name"] not in uniqueHandleSet:

                        uniqueHandleSet.add(tweetJsonObject["name"]);
                    else:
                            duplicateHandles.write(line);



            print("--- %s seconds --- memory 200mb while buffering" % (time.time() - start_time));  #Printing the total time required to execute 

        except:
            print("Error")

        finally:
            duplicateHandles.close();

Ответы [ 2 ]

0 голосов
/ 21 декабря 2018
from multiprocessing import Process, Manager, Queue
import json

output = open ('output', 'w+', encoding='utf-8')

def findDuplicate(inputQueue, uniqueValues, output):
    for line in iter(inputQueue.get, 'STOP'): #get line from Queue, stop if 'STOP' is received
        if line['name'] not in uniqueValues: # check if duplicate
            uniqueValues.append(line)
        else:
            output.write(line) # store it

manager = Manager() # get a new SyncManager
uniqueValues = manager.list() # handle for shared list
duplicates = manager.list() # a 2nd handle for a shared list
inputQueue = Queue() # a queue to provide tasks to the processes

# setup workers, provide shared lists and tasks
numProc = 4
process = [Process(target=findDuplicate,
                      args=(inputQueue, uniqueValues, output)) for x in range(numProc)]

# start processes, they will idle if nothing is in queue
for p in process:
    p.start()

with open('username_sample.jsonrows', buffering= 20000000, encoding='utf-8') as f:
    for line in f:
        inputQueue = json.loads(line, block=True) # put line in queue, only if free slot avaible
for p in process:
    inputQueue.put('STOP') # signal workers to stop as no further input

    # wait for processes to finish
for p in process:
    p.join()


output.close()

Я пытался сделать это, но получил ошибку TypeError: невозможно сериализовать объект '_io.TextIOWrapper'

0 голосов
/ 19 декабря 2018

Чтобы масштабировать его, вам понадобятся очереди для подачи нескольких процессов и два общих списка, чтобы отслеживать ваши результаты.Основная идея заключается в том, чтобы построчно передавать файл в очередь, которая впоследствии обрабатывается некоторыми пользовательскими процессами.Однако эти процессы используют два списка для хранения промежуточных результатов.Менеджер отвечает за синхронизацию между процессами.

Следующий код является лишь приблизительным, не проверенным на практике:

from multiprocessing import Process, Manager, Queue

def findDuplicate(inputQueue, uniqueValues, duplicates):
    for line in iter(inputQueue.get, 'STOP'): #get line from Queue, stop if 'STOP' is received
        if line not in uniqueValues: # check if duplicate
            uniqueValues.append(line)
        else:
            duplicates.append(line) # store it

manager = Manager() # get a new SyncManager
uniqueValues = manager.list() # handle for shared list
duplicates = manager.list() # a 2nd handle for a shared list
inputQueue = Queue() # a queue to provide tasks to the processes

# setup workers, provide shared lists and tasks
numProc = 4
process = [Process(target=findDuplicate,
                      args=(inputQueue, uniqueValues, duplicates)) for x in range(numProc)]

# start processes, they will idle if nothing is in queue
for p in process:
    p.start()

with open(ipPath) as f:
    for line in f:
        inputQueue.put(line, block=True) # put line in queue, only if free slot avaible
for p in process:
    inputQueue.put('STOP') # signal workers to stop as no further input

    # wait for processes to finish
for p in process:
    p.join()
...