У меня есть файл с транзакциями 1м + (csv), который мне нужно очистить от лишних пробелов и проверить тип ввода (например: int, float, ...).
Я передаю куски строк каждый раз и запускаю поток для обработки данных. Когда поток завершается, он обрабатывает другой блок и так далее, пока он не будет завершен. но проблема в том, что при окончательном просмотре очереди она имеет только последнюю обработанную строку * количество строк (1,01 млн транзакций).
Я пытался объявить очередь глобально, ничего не изменилось.
Я пытался напечатать результаты до того, как они попадут в очередь, они показывают тему в правильных результатах, но неправильно поставили ее в своей очереди.
Вместо этого я попытался использовать глобальный список, но это настоятельно не рекомендуется из-за характера многопоточности.
вызов потока:
threads = []
for chunck in reader:
threads.append(threading.Thread(target=clean , args=([chunck, queue])))
threads[-1].start()
for t in threads:
t.join()
функция очистки:
def clean(i, queue):
details = {}
for index, column in i.iterrows():
for key,val in column.items():
if isinstance(val, str):
details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] = " ".join(val.split())
else:
details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] = val
queue.put(details)
# queue.task_done()
return queue
Я ожидаю, что общее количество строк будет очищено и помещено в очередь, чтобы я мог сгенерировать окончательно очищенный CSV. но теперь он дает мне файл с транзакциями 1,01 млн с тем же значением последней обработанной строки.