Как добавить данные в очередь с многопоточностью (занимает только последнюю строку и дублирует ее)? - PullRequest
0 голосов
/ 02 апреля 2019

У меня есть файл с транзакциями 1м + (csv), который мне нужно очистить от лишних пробелов и проверить тип ввода (например: int, float, ...).

Я передаю куски строк каждый раз и запускаю поток для обработки данных. Когда поток завершается, он обрабатывает другой блок и так далее, пока он не будет завершен. но проблема в том, что при окончательном просмотре очереди она имеет только последнюю обработанную строку * количество строк (1,01 млн транзакций).

Я пытался объявить очередь глобально, ничего не изменилось. Я пытался напечатать результаты до того, как они попадут в очередь, они показывают тему в правильных результатах, но неправильно поставили ее в своей очереди. Вместо этого я попытался использовать глобальный список, но это настоятельно не рекомендуется из-за характера многопоточности.

вызов потока:

threads = []
for chunck in reader:
   threads.append(threading.Thread(target=clean , args=([chunck, queue])))
   threads[-1].start()

for t in threads:
   t.join()

функция очистки:

def clean(i, queue):
    details = {}
    for index, column in i.iterrows():
        for key,val in column.items():
            if isinstance(val, str):
                details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] =  " ".join(val.split())
            else:
                details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] = val
        queue.put(details)
        # queue.task_done()


    return queue

Я ожидаю, что общее количество строк будет очищено и помещено в очередь, чтобы я мог сгенерировать окончательно очищенный CSV. но теперь он дает мне файл с транзакциями 1,01 млн с тем же значением последней обработанной строки.

1 Ответ

0 голосов
/ 02 апреля 2019

Поскольку я получил большую помощь от сообщества Python в Reddit.

Проблема заключалась в функции очистки: я объявил dict вне первого цикла, и я должен объявить его внутри первого цикла.

Код:

 def clean(i, queue):

    for index, column in i.iterrows():
        details = {}
        for key,val in column.items():
            if isinstance(val, str):
                details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] =  " ".join(val.split())
            else:
                details[" ".join(key.split()).replace(" ","_").replace('.','').lower()] = val
        queue.put(details)
        # queue.task_done()


    return queue

Проблема возникла из-за того, что dict во всех случаях дублируется при завершении потоков.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...