Синхронизированный счетчик двумерных массивов с блокировками - PullRequest
0 голосов
/ 27 апреля 2019

Я хочу распараллелить метод, который работает с совместно используемым двумерным массивом.

Мое оригинальное приложение является частью исследования и очень сложным, однако я создал игрушечный пример, по сути повторяющий проблемы.

Есть магазин одежды, который продает одежду разных размеров и цветов. Я представляю инвентарь этого магазина в виде 2D матрицы, где self.supply_arr[i][j] представляет общую доступность одежды size i и color j. У меня есть несколько клиентов, пытающихся купить в магазине. Магазин не должен продавать одежду больше, чем ее инвентарь. Ниже я показываю непараллельный пример.

import numpy as np


class ClothStore(object):
    def __init__(self, num_customers):
        self.supply_arr = np.random.randint(5, size=(2,2))
        self.sold_arr = np.zeros((2,2), dtype=int)
        self.num_customers = num_customers

    def make_purchase(self, size, color):
        left = self.supply_arr[size][color] - self.sold_arr[size][color]
        if left > 0:
            self.sold_arr[size][color] += 1
            return True
        else:
            return False

    def run(self):
        for customer in xrange(self.num_customers):
            size = np.random.randint(2)
            color = np.random.randint(2)

            purchase = self.make_purchase(size, color)

            if purchase:
                print "Customer: {} made successful purchase".format(customer)

if __name__ == "__main__":
    store = ClothStore(100)
    store.run()

    print "Supply Arr: {}".format(store.supply_arr)
    print "Sold Arr: {}".format(store.sold_arr)

Я попытался распараллелить метод run(self), используя pathos и представляя self.supply_arr как np.empty((2,2), dtype=object), где каждый элемент, который я инициализирую как multiprocessing.Value. Тем не менее, я не смог заставить его работать. Любая помощь будет оценена. Спасибо.

1 Ответ

0 голосов
/ 27 апреля 2019

Мне удалось самостоятельно решить вопрос окольным путем.Это не самый элегантный способ, но он работает.Буду очень признателен за помощь в создании более элегантного.

import numpy as np
from pathos.multiprocessing import ProcessingPool as Pool
from multiprocess import Manager


class ClothStoreNew(object):
    def __init__(self, num_customers):
        self.supply_arr = np.random.randint(5, size=(2, 2))
        self.num_customers = num_customers

    def make_purchase(self, arg):
        sold_dict = arg[0]
        i = arg[1]

        size = self.demand[i][1]
        color = self.demand[i][2]
        sold = sold_dict.get((size, color), 0)
        if self.supply_arr[size][color] > sold:
            sold_dict[(size, color)] = sold + 1

    def run(self):
        m = Manager()
        sold_dict = m.dict()
        pool = Pool(processes=100)
        self.demand = []
        for customer in xrange(self.num_customers):
            size = np.random.randint(1)
            color = np.random.randint(1)
            self.demand.append([customer, size, color])

        pool.map(self.make_purchase, ([sold_dict, i] for i in xrange(self.num_customers)))
        pool.close()
        pool.join()
        return dict(sold_dict)


if __name__ == "__main__":
    store = ClothStoreNew(20)
    sold_dict = store.run()
    print "Supply Arr: {}".format(store.supply_arr)
    print "Sold Dict: {}".format(sold_dict)

Как видите, я использую manager.dict() для синхронизации.Я хотел бы использовать manager.list(), но это не похоже на работу.Кроме того, при использовании Manager блокирует весь словарь для каждого обновления, идеальным решением было бы то, которое блокирует каждый отдельный ключ dict (или каждую отдельную ячейку 2D-матрицы) за раз, так что процессы работают на других ячейках.не надо ждать.

...