Многопоточные вставки Numpy - PullRequest
1 голос
/ 28 декабря 2011

Я рассматриваю возможность создания таблицы с номерами в качестве базы данных ключ / значение. Входы / обновления будут многопоточными.

исследуя идею, Проблема: остановит ли GIL theads и разрешит только одно обновление за раз. Проблема: может ли многослойная таблица (таблица) быть смешанной.

Ответы [ 2 ]

4 голосов
/ 28 декабря 2011

Некоторые функции numpy не являются атомарными, поэтому, если два потока будут работать с одним и тем же массивом, вызывая некоторые неатомарные функции numpy, массив станет искаженным, потому что порядок операций будет смешан в некоторых непредвиденных путь.

Есть много примеров, но для конкретики numpy.apply_along_axis - это длинная последовательность операторов Python, явно не атомарная.

GIL вам не поможет, так как он может остановить один поток, пока он только частично выполняет какую-то неатомарную функцию numpy, а затем запустить другой поток, работающий с тем же массивом ...

Таким образом, чтобы быть потокобезопасным, вам нужно будет использовать threading.Lock и работать с массивом только после получения блокировки:

with lock:
    arr = ...

Необходимость везде использовать блокировку ставит под сомнение, есть ли какая-то польза от работы нескольких потоков в одном массиве. Обратите внимание, что иногда многопоточность при проблемах с процессором может привести к снижению производительности по сравнению с аналогичной однопоточной версией.

См. Также ParallelProgramming с вики-страницей numpy и scipy для получения дополнительных альтернатив и обсуждения.

1 голос
/ 20 октября 2015

Мне просто нужно было это, так что я написал это .. Собираюсь попробовать это сейчас, так что не уверен, работает ли он еще как ожидалось ..

class LockedNumpyArray(object):
    """
    Thread safe numpy array
    """
    def __init__(self):
        self.lock = threading.Lock()
        self.val = None

    def __get__(self, obj, objtype):
        self.lock.acquire()
        if self.val != None:
            ret_val = self.val.copy()
        else:
            ret_val = None
        self.lock.release()
        # print('getting', ret_val)
        return ret_val

    def __set__(self, obj, val):
        self.lock.acquire()
        # print('setting', val)
        self.val = val.copy()
        self.lock.release()

Это класс для массива numpy.Затем у меня есть класс для управления, потому что позже я хочу, чтобы работали более многопоточные массивы.

class CaptureControl():
    """
    Shared class to control source capture execution
    """
    frame = LockedNumpyArray()

    def __init__(self):
        print(self.frame)
        self.frame = np.array([2])
        print(self.frame)

В конце я поместил экземпляр этого класса CaptureControl в потоки следующим образом.

class CaptureThread(threading.Thread):
    """
    Thread running source capturing
    """
    def __init__(self, capture_control):
        threading.Thread.__init__(self)
        self.capture_control = capture_control
        self.sleepTime = 0.01
    def run(self):
        while True:
            self.capture_control.capture_frame()
            time.sleep(self.sleepTime)

if __name__ == '__main__':
    capture_control = CaptureControl()
    capture_thread = CaptureThread(capture_control)
    capture_thread.start()

Я буду рад, если кто-нибудь поделится своими мыслями об этом решении, не считая time.sleep в функции запуска потока, поскольку это только пример;).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...