Адрес администратора многопроцессорной очереди Python, который уже используется - PullRequest
0 голосов
/ 12 ноября 2018

В настоящее время я пытаюсь реализовать связь между различными процессами в Python 3.6.7.Моя идея состояла в том, чтобы использовать очереди, потому что именованных каналов было недостаточно.

У меня есть слушатель, реализованный следующим образом:

result_queue = queue.Queue()
BaseManager.register('queue', callable=lamda: result_queue)
queue_manager = BaseManager(address=('127.0.0.1', 50000))
queue_manager.start()
while do_run:
   data = result_queue.get()
   print(data)
queue_manager.shutdown()

И у меня есть писатель, определенный следующим образом:

BaseManager.register('queue')
manager = BaseManager(address=('127.0.0.1', 50000))
manager.connect()
queue = manager.queue()
queue.put(message)

Проблема в том, что когда я запускаю свои модульные тесты в среде Docker, я получаю следующую ошибку:

 OSError: [Errno 98] Address already in use

Кажется, что работает нормально для первого теста, но продолжает иметьприведенная выше ошибка для всех последующих тестов.

Я предполагаю, что BaseManager обнаружил «TIME_WAIT» на порту, но, похоже, я не могу установить опцию повторного использования сокета вместо ожидания.

Я что-то здесь не так делаю или есть способы заставить повторно использовать сокеты?

1 Ответ

0 голосов
/ 12 ноября 2018

Простой способ исправить это - позволить ОС выбрать порт:

queue_manager = BaseManager(address=('127.0.0.1', 0))
print(queue_manager.adddress)

Поскольку вы запускаете прослушиватель и модуль записи из общего родительского процесса, вы можете сделать это выше, захватить адрес где-нибудь (канал, файл и т. Д.), А затем передать номер порта другому процессу.

Таким образом, вы даже можете запускать несколько экземпляров теста параллельно на одном компьютере одновременно без конфликтов. ОС будет следить за тем, чтобы вы получали порт, который не используется каждый раз.

...