Как синхронизировать переменные инициализации в классе, используя Pool imap - PullRequest
0 голосов
/ 11 июля 2019

Надеюсь, я буду достаточно конкретен.Я запрограммировал парсер для конкретных данных устройства.У меня также есть архив, который я могу использовать, чтобы заполнить данные в новой системе или проверить парсер.Сейчас я тестирую, как ускорить весь процесс, используя multiproccesing, и в итоге получаю multiprocessing.Pool.

. Итак, у меня есть рабочая оболочка, подобная этой

def worker(packet):
    packet_object = None
    try:
        packet_object = MyParser(packet).parse()
    except Exception as e:
        # *** more code here ***

    return packet_object

Затем у меня есть генератордля данных:

def get_data():
    dir_companies = listdir(archive_dir)
    for company_dir in dir_companies:
        self.current_dir = company_dir
        dir_dates = listdir(archive_dir + company_dir + "/")
        for dir_date in dir_dates:
            archive_files = glob.glob(archive_dir + company_dir + "/" + dir_date + "/" + mask)
            for file in archive_files:
                file = file.replace("\\", "/")
                with lzma.open(file) as f:
                    matches = re.finditer(regex, str(f.read()), re.MULTILINE)
                    for matchNum, match in enumerate(matches, start=1):
                        for groupNum in range(0, len(match.groups())):
                            groupNum = groupNum + 1

                            out = "{group}".format(group=match.group(groupNum))
                            rows = out.split('),(')
                            for lines in rows:
                                for m in re.finditer(regex_internal, lines):
                                    try:
                                        packet = m.group('packet')
                                    except IndexError:
                                        print(f'regex error packet not found')
                                    yield unhexlify(bytes(packet, 'ascii'))

Тогда я могу просто запустить такой код:

data = get_data()
p = Pool()

for result in p.imap(worker, data):
    # look at the result here, store to db, whatever
    pass

Поскольку я не вижу внутри генератора или работника, я решил свернуть его в класс, чтобыввести некоторые счетчики, статистику и т.д ...

class PoolWorker:
    def __init__(self):
        self.start = dt.now()
        self.total_packets = 0
        self.error_packets = 0
        self.total_receipts = 0
        self.error_critical = 0
        self.total_files = 0
        self.db_size = None
        self.current_file = None
        self.current_dir = None

    def worker(self, packet):
        #  code above

    def get_data(self):

        # code above

тогда я запускаю такой код

if __name__ == '__main__':
    pw = PoolWorker()
    p = Pool()

    directory = None

    for res in p.imap_unordered(pw.worker, pw.get_data()):
        if directory is None:
            directory = pw.current_dir
        elif directory != pw.current_dir:
            directory = pw.current_dir
            pw.get_stats()

    pw.get_stats()

Моя проблема в том, что счетчики пакетов не перемещаются из 0 У меня естьпонять, почему это так (может быть, есть много (под) экземпляров класса PoolWorker), но я могу выяснить, как заставить эти счетчики работать

Я попытался добавить следующий код в класс, но это помоглоне помогает

    def __getstate__(self):
        return self.__dict__.copy()

    def __setstate__(self, dict):
        self.__dict__ = dict

Этот код должен анализировать около 1 миллиарда архивных пакетов, поэтому любое ускорение было бы неплохо.Самое смешное, что get_data(self) внутри PoolWorker класс генератора может корректно изменить self.current_file, self.current_dir, но worker(self, packet) не может.

...