Неполная многопоточность? - PullRequest
0 голосов
/ 12 июля 2019

У меня есть следующий код Python:

pool = ThreadPool(32)
l = defaultdict(lambda: 0)

def func(e):
    if "$" in e:
        l["included"] += 1
    else:
        l["not_included"] += 1

with open(file_name) as file:
    data = file_name.readlines()

pool.map(func, data)

with open("output/logs.txt") as file:
    file.write(l)

По сути, он ищет строки в файле, которые содержат символ "$". Однако каждый раз, когда я выполняю код, этот выходной файл отличается, что означает, что добавляемый список l отличается. Что может быть причиной этого?

Запутанная часть в том, что результирующий defaultdict l отличается от каждого исполнения. Иногда l = {"included": 772, "not_included": 9992}, иногда l = {"included": 878, "not_included": 6907} и т. Д.

Ответы [ 2 ]

0 голосов
/ 12 июля 2019

Поскольку map() возвращает значение из потоков, поэтому я использую его для возврата True или False, а затем подсчитываю их в главном потоке.Таким образом, потоки не пытаются использовать один и тот же l, что часто создает проблемы.

from multiprocessing.pool import ThreadPool

pool = ThreadPool(32)

def func(e):
    return "$" in e

data = ['$','$','x','$','$','x','$','$','x','x','$','x','x']

results = pool.map(func, data)

print(results)

l = dict()
l["included"] = results.count(True)
l["not included"] = results.count(False)

print(l)

Результат

[True, True, False, True, True, False, True, True, False, False, True, False, False]
{'included': 7, 'not included': 6}

Проблема в том, что один поток изменяет значение, ноон не блокирует доступ к переменной, поэтому в то же время другой поток получает старое значение вместо нового, и он изменяет это старое значение и возвращает его в переменную, что приводит к неверному результату.Потоки должны будут использовать блокировку или семафору, чтобы заблокировать доступ к переменной.

0 голосов
/ 12 июля 2019

Каждая операция func занимает разное количество времени. Рассмотрим:

>>> from multiprocessing.dummy import Pool as ThreadPool
>>> from random import randrange
>>> min_time = 0
>>> max_time = 4
>>> import time
>>> pool = ThreadPool(4)
>>> lst = []
>>> def func(e):
...     time.sleep(randrange(min_time, max_time))
...     lst.append(e ** 2)
... 
>>> data = list(range(20))
>>> pool.map(func, data)
>>> lst
[0, 4, 1, 36, 9, 16, 100, 121, 64, 81, 144, 49, 25, 169, 256, 289, 196, 225, 324, 361]

Каждый поток вызывает func, но занимает разное время. Поскольку одновременно запущено несколько потоков, они не гарантированно добавляются в любом порядке. Может быть, первый поток начинается с ввода 0, но занимает 2 секунды, но второй поток начинается с 1 и занимает меньше секунды. В этом случае результат вызова func с 1 добавляется первым.

Редактировать: я предполагаю, что длина файла одинакова, а порядок - это разница.

...