Сделайте метод в классе устойчивым к одновременному вызову - PullRequest
0 голосов
/ 31 октября 2019

У меня есть следующий класс утилит:

class RunningStatisticsVar:
    def __init__(self, ddof=0):
        self.mean = 0
        self.var = 0
        self.std = 0

        self._n = 0
        self._s = 0
        self._ddof = ddof

    def update(self, value):
        self._n += 1

        old_mean = self.mean
        self.mean += (value - old_mean) / self._n

        self._s += (value - old_mean) * (value - self.mean)
        self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
        self.std = np.sqrt(self.var)

Это вычисляет и хранит текущее среднее и стандартное значение (длинного) потока чисел. Это прекрасно работает, но, поскольку я помещаю класс в свою личную библиотеку, я хотел бы сделать его устойчивым к параллельному выполнению. Например, я хотел бы иметь возможность сделать следующее:

from joblib.parallel import Parallel, delayed

def execute_and_update(var):
    a = do_stuff()
    var.update(a)
    b, c = do_more_stuff()
    var.update(b)
    var.update(c)

stat = RunningStatisticsVar()
Parallel()(delayed(execute_and_update)(stat) for _ in range(1000))

и иметь вызовы update поточно-ориентированными.

Поиск в Google для этого дал мне много способов выполняет код одновременно, но я не нашел способа сделать мой класс безопасным для одновременного выполнения . В Java, IIRC, это можно сделать с помощью атомарных методов / классов, но я не думаю, что в Python это есть.

ОБНОВЛЕНИЕ

После комментария я обновил свой код, однако я 'я получаю сообщение об ошибке при попытке вызвать мой метод из Parallel:

from joblib.parallel import Parallel, delayed
import numpy as np
from threading import Lock

class RunningStatisticsVar:
  def __init__(self, ddof=0):
    self.mean = 0
    self.var = 0
    self.std = 0

    self._n = 0
    self._s = 0
    self._ddof = ddof

    self._lock = Lock()

  def update(self, value):
    with self._lock:
      self._n += 1

      old_mean = self.mean
      self.mean += (value - old_mean) / self._n

      self._s += (value - old_mean) * (value - self.mean)
      self.var = self._s / (self._n - self._ddof) if self._n > self._ddof else 0
      self.std = np.sqrt(self.var)

samples = np.random.uniform(0, 100, [1000])
s1 = RunningStatisticsVar()
s2 = RunningStatisticsVar()

for i in samples:
  s1.update(i)
Parallel(n_jobs=-1)(delayed(lambda x: s2.update(x))(i) for i in samples) #

print(s1.mean, s1.std)
print(s2.mean, s2.std)

Попытка запустить приведенный выше код дает мне следующую ошибку в строке, отмеченной #:

TypeError: невозможно выбрать объекты _thread.lock

...