Как создать длительные потоки с модулем concurrent.futures в Python (для проблемы производителя / потребителя)? - PullRequest
0 голосов
/ 27 мая 2020

Я хочу реализовать алгоритм потребителя / производителя для приложения IoT, используя Python. В настоящее время я использую модуль multiprocessing.dummy следующим образом:

import multiprocessing.dummy as mt
import queue

n_writers = len(writers)
queues = [queue.Queue(max_q_size) for i in range(n_writers)]
sensor = Sensor()


def _stream(qs: List[queue.Queue]):
    while True:
        point = sensor.read()
        for q in qs:
            q.put(point)
        time.sleep(1/(2*self.sampling_rate))

def _write_worker(q: queue.Queue, writer: Writer):
    while True:
        sensor._logger.debug(f'{type(writer).__name__} q size: {q.qsize()}')
        if not q.empty():
            writer.write([q.get()])
        else:
            time.sleep(2 / self.sampling_rate)

workers = mt.Pool(n_writers)

streamer = mt.Process(target=_stream, args=(queues,))
streamer.start()
with workers:
    workers.starmap(_write_worker, zip(queues, writers))

Датчик - это настраиваемый объект (слой абстракции на датчике).

Это работает, но не является надежным, поскольку не вызывает ошибок, которые возникают в методе объекта, вызываемом процессами (см. здесь )

Я хотел бы сделать то же самое с модулем concurrent.futures . Я могу использовать его для запуска параллельных потоков, которые я хочу собрать, когда все они будут завершены, но я не нашел способа создать долго работающий стример и воркеры, как это было в примере выше. Что мне не хватает?

...