Как ускорить импорт продуктов odoo с помощью многопоточности? - PullRequest
0 голосов
/ 27 сентября 2019

Я столкнулся с проблемой реализации собственного импортера с использованием нескольких потоков для ускорения процесса.Это работает, если импортированный набор данных не слишком большой.Однако мне нужно импортировать 100 тыс. Кортежей в мою базу данных.Система просто прекращает выполнение кода без каких-либо сообщений об ошибках или выходных данных журнала.

В следующих листингах кода вы можете увидеть мою реализацию, использующую модуль threading для порождения новых потоков.Затем я использую модуль queue, чтобы поставить в очередь пакеты данных для обработки.Наконец, я использую пользовательский lock в своем выходном словаре, поэтому у меня не возникает никаких проблем.

  import_result = {"ids": [], "messages": []}
  import_result_lock = Lock()
  batch_size = 100
  num_workers = 2
  q = Queue()
  # data contains all tuples
  for _ in range(min(num_workers, int(len(data) / batch_size))):
    t = Thread(
      target=self._import_worker, 
      args=(q, get_columns, import_result, import_result_lock)
    )
    t.deamon = True
    t.start()

  for sublist in [data[i:i + batch_size] for i in range(0, len(data), batch_size)]:
    q.put(sublist)

  q.join()

Каждый поток выполняет метод _import_worker:

def _import_worker(self, queue, get_columns, import_result, import_result_lock):
        _, user, context = self.env.args
        with api.Environment.manage():
            with sql_db.db_connect(self.env.cr.dbname).cursor() as new_cr:
                new_env = api.Environment(new_cr, user, context)
                model = new_env['product.template']

                model = model.with_context(import_file=True, name_create_enabled_fields=True)

                while True:
                    data = queue.get()
                    _logger.info('Thread started processing of batch with length: {}'.format(len(data)))
                    temp_dict = {"ids": [], "messages": []}

                    temp_dict = model.load(get_columns(), data)

                    if (len(temp_dict['ids'])):
                        _logger.info('Committing cursor changes')
                        new_cr.commit()
                        # _logger.info('Resetting env')
                        # new_env.reset()

                    queue.task_done()

                    try:
                        import_result_lock.acquire()
                        import_result['ids'].extend(temp_dict['ids'])
                        import_result['messages'].extend(temp_dict['messages'])
                    finally:
                        import_result_lock.release()
        _logger.info('Worker finished')
        return temp_dict

Я ожидал импортазакончить в мгновение ока (не на самом деле, но вы поняли).Однако это выглядит намного медленнее, чем поведение по умолчанию, где я просто беру весь объект data и запихиваю его в метод model.load().

Возможно, существует проблема с созданием целых сред для каждогонить?Но мне определенно нужен отдельный курсор базы данных.Также профилирование показало, что узким местом остается метод model.load().

Заранее спасибо и наилучшие пожелания

...