потоки с питоном: проблема локальных переменных - PullRequest
2 голосов
/ 21 декабря 2011

У меня возникли проблемы с использованием модуля threading и scipy.stats.randint. Действительно, когда запускается несколько потоков, локальный массив (bootIndexs в приведенном ниже коде), похоже, используется для всех запущенных потоков.

Это повышенная ошибка

> Exception in thread Thread-559:
Traceback (most recent call last):
...
  File "..\calculDomaine3.py", line 223, in bootThread
    result = bootstrap(nbB, distMod)
  File "...\calculDomaine3.py", line 207, in bootstrap
    bootIndexs = spstats.randint.rvs(0, nbTirages-1, size = nbTirages)
  File "C:\Python27\lib\site-packages\scipy\stats\distributions.py", line 5014, in rvs
    return super(rv_discrete, self).rvs(*args, **kwargs)
  File "C:\Python27\lib\site-packages\scipy\stats\distributions.py", line 582, in rvs
    vals = reshape(vals, size)
  File "C:\Python27\lib\site-packages\numpy\core\fromnumeric.py", line 171, in reshape
    return reshape(newshape, order=order)
ValueError: total size of new array must be unchanged

А это мой код:

import threading
import Queue
from scipy import stats as spstats

nbThreads = 4

def test(nbBoots, nbTirages,  modules ):

    def bootstrap(nbBootsThread, distribModules) :

         distribMax = []            

         for j in range(nbBootsThread): 
             bootIndexs = spstats.randint.rvs(0, nbTirages-1, size = nbTirages) 
             boot = [distribModules[i] for i in bootIndexs]

             distribMax.append(max(boot))

         return distribMax

    q = Queue.Queue()

    def bootThread (nbB, distMod):
        result = bootstrap(nbB, distMod )
        q.put(result, False)
        q.task_done()

    works = []

    for i in range(nbThreads) :     
        works.append(threading.Thread(target = bootThread, args = (nbBoots//nbThreads, modules[:],) ))


    for w in works:
        w.daemon = True
        w.start()

    q.join()

        distMaxResult = []

        for j in range(q.qsize()):
            distMaxResult += q.get()

        return distMaxResult

class classTest:
    def __init__(self):
        self.launch()

    def launch(self):
        print test(100, 1000, range(1000) )

Спасибо за ваши ответы.

Ответы [ 2 ]

2 голосов
/ 21 декабря 2011

Действительно, при запуске нескольких потоков локальный массив (bootIndexs в приведенном ниже коде), похоже, используется для всех запущенных потоков.

В этом весь смысл потоков: легкийзадачи, которые разделяют все с их процессом нереста!:) Если вы ищете решение без разделения ресурсов, то вам, возможно, стоит взглянуть на модуль многопроцессорной обработки (имейте в виду, что порождение процесса в системе намного тяжелее, чем порождение потока).

Однако вернемся к вашей проблеме ... моя - чуть больше, чем выстрел в темноте, но вы можете попробовать изменить эту строку:

boot = [distribModules[i] for i in bootIndexs]

на:

boot = [distribModules[i] for i in bootIndexs.copy()]

(используя копию массива, а не сам массив).Это кажется маловероятной проблемой (вы просто перебираете массив, а не используете его), но это единственный момент, который я вижу, когда вы используете его в своем потоке, так что ...

Это, конечно,работает, если содержимое вашего массива не должно изменяться потоками, управляющими им.Если изменение значения «глобального» массива является правильным поведением, вам следует наоборот реализовать Lock(), чтобы запретить одновременный доступ к этому ресурсу.Ваши темы должны сделать что-то вроде:

lock.acquire()
# Manipulate the array content here
lock.release()
1 голос
/ 24 декабря 2011

У меня нет опыта работы с потоками, так что это может быть совершенно неверно.

scipy.stats.randint, как и другие дистрибутивы в scipy.stats, является экземпляром соответствующего класса распространения.Это означает, что каждый поток обращается к одному и тому же экземпляру.Во время вызова rvs устанавливается атрибут _size.Если другой поток с другим размером обращается к экземпляру за это время, вы получите ValueError, что размеры не совпадают при изменении формы.Для меня это звучит как состояние гонки.

Я бы порекомендовал использовать numpy.random непосредственно в этом случае (это вызов в scipy.stats.randint)

numpy.random.randint(min, max, self._size)

может быть, выповезет больше.

Если вам нужен дистрибутив, который недоступен в numpy.random, то вам нужно будет создавать новые экземпляры дистрибутива в каждом потоке, если мои предположения верны.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...