Pool.apply_async (). Get () вызывает ошибку рассылки _thread.lock - PullRequest
0 голосов
/ 07 октября 2018

Я недавно создал программу на python, которая очень выиграет от стратегии параллельных вычислений потребителя / производителя.Я пытался разработать модуль (Class), чтобы упростить реализацию такой стратегии обработки, но быстро столкнулся с проблемой.

Мой класс ProducerConsumer:

class ProducerConsumer(object):
    def __init__(self, workers_qt, producer, consumer, min_producer_qt=1):
        self.producer_functor = producer        # Pointer to the producer function
        self.consumer_functor = consumer        # Pointer to the consumer function

        self.buffer = deque([])                 # Thread-safe double-ended queue item for intermediate result buffer

        self.workers_qt = workers_qt
        self.min_producer_qt = min_producer_qt  # Minimum quantity of active producers (if enough remaining input data)

        self.producers = []                     # List of producers async results
        self.consumers = []                     # List of consumers async results

    def produce(self, params, callback=None):
        result = self.producer_functor(*params)  # Execute the producer function
        if callback is not None:
            callback()                           # Call the callback (if there is one)
        return result

    def consume(self, params, callback=None):
        result = self.consumer_functor(params)  # Execute the producer function
        if callback is not None:
            callback()                          # Call the callback (if there is one)
        return result

    # Map a list of producer's input data to a list of consumer's output data
    def map_result(self, producers_param):
        result = []                               # Result container
        producers_param = deque(producers_param)  # Convert input to double-ended queue (for popleft() member)

        with Pool(self.workers_qt) as p:          # Create a worker pool
            while self.buffer or producers_param or self.consumers or self.producers:  # Work remaining
                # Create consumers
                if self.buffer and (len(self.producers) >= self.min_producer_qt or not producers_param):
                    consumer_param = self.buffer.popleft()  # Pop one set from the consumer param queue
                    if not isinstance(consumer_param, tuple):
                        consumer_param = (consumer_param,)   # Force tuple type

                    self.consumers.append(p.apply_async(func=self.consume, args=consumer_param))  # Start new consumer

                # Create producers
                elif producers_param:
                    producer_param = producers_param.popleft()  # Pop one set from the consumer param queue
                    if not isinstance(producer_param, tuple):
                        producer_param = (producer_param,)      # Force tuple type

                    self.producers.append(p.apply_async(func=self.produce, args=producer_param))  # Start new producer

                # Filter finished async_tasks
                finished_producers = [r for r in self.producers if r.ready()] if self.producers else []
                finished_consumers = [r for r in self.consumers if r.ready()] if self.consumers else []

                # Remove finished async_tasks from the running tasks list
                self.producers = [r for r in self.producers if r not in finished_producers]
                self.consumers = [r for r in self.consumers if r not in finished_consumers]

                # Extract result from finished async_tasks
                for r in finished_producers:
                    assert r.ready()
                    self.buffer.append(r.get())  # Get the producer result and put it in the buffer
                for r in finished_consumers:
                    assert r.ready()
                    result.append(r.get())       # Get the consumer tesult and put in in the function local result var

            return result

В элементеmap_result (), когда я пытаюсь "получить ()" результат функции apply_async (...), я получаю следующую ошибку (обратите внимание, что я использую python3):

Traceback (most recent call last):
  File "ProducerConsumer.py", line 91, in <module>
    test()
  File "ProducerConsumer.py", line 85, in test
    result = pc.map_result(input)
  File "ProducerConsumer.py", line 64, in map_result
    self.buffer.append(r.get())  # Get the producer result and put it in the buffer
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 608, in get
    raise self._value
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 385, in _handle_tasks
    put(task)
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 206, in send
    self._send_bytes(ForkingPickler.dumps(obj))
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

ИВот некоторый код для воспроизведения моей ошибки (очевидно, зависит от класса):

def test_producer(val):
    return val*12


def test_consumer(val):
    return val/4


def test():
    pc = ProducerConsumer(4, test_producer, test_consumer)
    input    = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]    # Input for the test of the ProducerConsumer class
    expected = [0, 3, 6, 9, 15, 18, 21, 23, 27]  # Expected output for the test of the ProducerConsumer class

    result = pc.map_result(input)

    print('got      : {}'.format(result))
    print('expected : {}'.format(expected))

if __name__ == '__main__':
    test()

Обратите внимание, что в члене класса map_result () я только "get ()" получаю результаты, которые "ready ()"".

Из того, что я знаю о мариновании (что я признаю, это не так уж и много), я бы сказал, что тот факт, что я использую Pool.apply_async (...) для функции-члена, может сыграть свою роль, ноЯ бы очень хотел сохранить структуру классов, если смогу.

Спасибо за помощь!

1 Ответ

0 голосов
/ 07 октября 2018

Итак, проблема была исправлена, когда я также исправил некоторые ошибки зачатия:

Мои 3 буферные переменные (буфер, производители, потребители) не имели ничего общего с классом, так как они были семантически связаны ссам элемент "map_result ()".

Таким образом, патч удалял эти элементы и создавал их как локальные переменные члена "map_result ()".

Проблема, даже если концепциябыл неисправен, мне все еще трудно понять, почему работник не может взломать замок (того параметра, который я теперь полагаю), так что ... Если у кого-то есть четкое объяснение того, что происходит (или ссылка на некоторых), чтобыл бы очень признателен.

...