Я столкнулся с проблемой реализации собственного импортера с использованием нескольких потоков для ускорения процесса.Это работает, если импортированный набор данных не слишком большой.Однако мне нужно импортировать 100 тыс. Кортежей в мою базу данных.Система просто прекращает выполнение кода без каких-либо сообщений об ошибках или выходных данных журнала.
В следующих листингах кода вы можете увидеть мою реализацию, использующую модуль threading
для порождения новых потоков.Затем я использую модуль queue
, чтобы поставить в очередь пакеты данных для обработки.Наконец, я использую пользовательский lock
в своем выходном словаре, поэтому у меня не возникает никаких проблем.
import_result = {"ids": [], "messages": []}
import_result_lock = Lock()
batch_size = 100
num_workers = 2
q = Queue()
# data contains all tuples
for _ in range(min(num_workers, int(len(data) / batch_size))):
t = Thread(
target=self._import_worker,
args=(q, get_columns, import_result, import_result_lock)
)
t.deamon = True
t.start()
for sublist in [data[i:i + batch_size] for i in range(0, len(data), batch_size)]:
q.put(sublist)
q.join()
Каждый поток выполняет метод _import_worker:
def _import_worker(self, queue, get_columns, import_result, import_result_lock):
_, user, context = self.env.args
with api.Environment.manage():
with sql_db.db_connect(self.env.cr.dbname).cursor() as new_cr:
new_env = api.Environment(new_cr, user, context)
model = new_env['product.template']
model = model.with_context(import_file=True, name_create_enabled_fields=True)
while True:
data = queue.get()
_logger.info('Thread started processing of batch with length: {}'.format(len(data)))
temp_dict = {"ids": [], "messages": []}
temp_dict = model.load(get_columns(), data)
if (len(temp_dict['ids'])):
_logger.info('Committing cursor changes')
new_cr.commit()
# _logger.info('Resetting env')
# new_env.reset()
queue.task_done()
try:
import_result_lock.acquire()
import_result['ids'].extend(temp_dict['ids'])
import_result['messages'].extend(temp_dict['messages'])
finally:
import_result_lock.release()
_logger.info('Worker finished')
return temp_dict
Я ожидал импортазакончить в мгновение ока (не на самом деле, но вы поняли).Однако это выглядит намного медленнее, чем поведение по умолчанию, где я просто беру весь объект data
и запихиваю его в метод model.load()
.
Возможно, существует проблема с созданием целых сред для каждогонить?Но мне определенно нужен отдельный курсор базы данных.Также профилирование показало, что узким местом остается метод model.load()
.
Заранее спасибо и наилучшие пожелания