Я хочу реализовать алгоритм потребителя / производителя для приложения IoT, используя Python. В настоящее время я использую модуль multiprocessing.dummy следующим образом:
import multiprocessing.dummy as mt
import queue
n_writers = len(writers)
queues = [queue.Queue(max_q_size) for i in range(n_writers)]
sensor = Sensor()
def _stream(qs: List[queue.Queue]):
while True:
point = sensor.read()
for q in qs:
q.put(point)
time.sleep(1/(2*self.sampling_rate))
def _write_worker(q: queue.Queue, writer: Writer):
while True:
sensor._logger.debug(f'{type(writer).__name__} q size: {q.qsize()}')
if not q.empty():
writer.write([q.get()])
else:
time.sleep(2 / self.sampling_rate)
workers = mt.Pool(n_writers)
streamer = mt.Process(target=_stream, args=(queues,))
streamer.start()
with workers:
workers.starmap(_write_worker, zip(queues, writers))
Датчик - это настраиваемый объект (слой абстракции на датчике).
Это работает, но не является надежным, поскольку не вызывает ошибок, которые возникают в методе объекта, вызываемом процессами (см. здесь )
Я хотел бы сделать то же самое с модулем concurrent.futures . Я могу использовать его для запуска параллельных потоков, которые я хочу собрать, когда все они будут завершены, но я не нашел способа создать долго работающий стример и воркеры, как это было в примере выше. Что мне не хватает?