Надеюсь, я буду достаточно конкретен.Я запрограммировал парсер для конкретных данных устройства.У меня также есть архив, который я могу использовать, чтобы заполнить данные в новой системе или проверить парсер.Сейчас я тестирую, как ускорить весь процесс, используя multiproccesing
, и в итоге получаю multiprocessing.Pool
.
. Итак, у меня есть рабочая оболочка, подобная этой
def worker(packet):
packet_object = None
try:
packet_object = MyParser(packet).parse()
except Exception as e:
# *** more code here ***
return packet_object
Затем у меня есть генератордля данных:
def get_data():
dir_companies = listdir(archive_dir)
for company_dir in dir_companies:
self.current_dir = company_dir
dir_dates = listdir(archive_dir + company_dir + "/")
for dir_date in dir_dates:
archive_files = glob.glob(archive_dir + company_dir + "/" + dir_date + "/" + mask)
for file in archive_files:
file = file.replace("\\", "/")
with lzma.open(file) as f:
matches = re.finditer(regex, str(f.read()), re.MULTILINE)
for matchNum, match in enumerate(matches, start=1):
for groupNum in range(0, len(match.groups())):
groupNum = groupNum + 1
out = "{group}".format(group=match.group(groupNum))
rows = out.split('),(')
for lines in rows:
for m in re.finditer(regex_internal, lines):
try:
packet = m.group('packet')
except IndexError:
print(f'regex error packet not found')
yield unhexlify(bytes(packet, 'ascii'))
Тогда я могу просто запустить такой код:
data = get_data()
p = Pool()
for result in p.imap(worker, data):
# look at the result here, store to db, whatever
pass
Поскольку я не вижу внутри генератора или работника, я решил свернуть его в класс, чтобыввести некоторые счетчики, статистику и т.д ...
class PoolWorker:
def __init__(self):
self.start = dt.now()
self.total_packets = 0
self.error_packets = 0
self.total_receipts = 0
self.error_critical = 0
self.total_files = 0
self.db_size = None
self.current_file = None
self.current_dir = None
def worker(self, packet):
# code above
def get_data(self):
# code above
тогда я запускаю такой код
if __name__ == '__main__':
pw = PoolWorker()
p = Pool()
directory = None
for res in p.imap_unordered(pw.worker, pw.get_data()):
if directory is None:
directory = pw.current_dir
elif directory != pw.current_dir:
directory = pw.current_dir
pw.get_stats()
pw.get_stats()
Моя проблема в том, что счетчики пакетов не перемещаются из 0 У меня естьпонять, почему это так (может быть, есть много (под) экземпляров класса PoolWorker
), но я могу выяснить, как заставить эти счетчики работать
Я попытался добавить следующий код в класс, но это помоглоне помогает
def __getstate__(self):
return self.__dict__.copy()
def __setstate__(self, dict):
self.__dict__ = dict
Этот код должен анализировать около 1 миллиарда архивных пакетов, поэтому любое ускорение было бы неплохо.Самое смешное, что get_data(self)
внутри PoolWorker
класс генератора может корректно изменить self.current_file
, self.current_dir
, но worker(self, packet)
не может.