У вас три основные проблемы. Первая проблема, скорее всего, ответ на ваш вопрос.
Блокировка (проблема вопроса)
socket.recv
блокируется. Это означает, что выполнение остановлено и поток переходит в спящий режим, пока не сможет прочитать данные из сокета. Таким образом, ваш третий поток обновлений просто заполняет очередь, но он очищается только при получении сообщения. Очередь также очищается одним сообщением за раз.
Вероятно, поэтому он не будет отправлять данные, если вы не отправите данные.
Протокол сообщений в потоке Протокол
Вы пытаетесь использовать поток сокетов как поток сообщений. Я имею в виду, что у вас есть:
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Часть SOCK_STREAM
говорит, что это поток, а не сообщение, такое как SOCK_DGRAM
. Тем не менее, TCP не поддерживает сообщения. Итак, вам нужно создать такие сообщения, как:
data =struct.pack('I', len(msg)) + msg
socket.sendall(data)
Тогда принимающая сторона будет искать поле длины и считывать данные в буфер. Как только в буфере окажется достаточно данных, он может получить сообщение whole .
Ваша текущая настройка работает, потому что ваши сообщения достаточно малы, чтобы все они помещались в один и тот же пакет и вместе помещались в буфер сокетов. Однако, как только вы начнете отправлять большие данные по нескольким вызовам с помощью socket.send
или socket.sendall
, вы начнете читать несколько сообщений и частичные сообщения, если не внедрите протокол сообщений поверх потока байтов сокета.
Тема
Несмотря на то, что потоки могут быть более просты в использовании при запуске, они имеют много проблем и могут ухудшить производительность при неправильном использовании, особенно в Python. Я люблю темы, поэтому не поймите меня неправильно. В Python также есть проблема с GIL (глобальной блокировкой интерпретатора), поэтому вы получаете плохую производительность при использовании потоков, связанных с процессором. Ваш код в основном связан с вводом / выводом в данный момент, но в будущем он может стать связанным с процессором. Также вам нужно беспокоиться о блокировке с помощью потоков. Нить может быть быстрым, но не лучшим решением. Существуют обстоятельства, когда многопоточность - самый простой способ прервать трудоемкий процесс. Так что не отбрасывайте темы как злые или ужасные. В Python они считаются плохими в основном из-за GIL, а в других языках (включая Python) из-за проблем параллелизма, поэтому большинство людей рекомендуют использовать несколько процессов с Python или использовать асинхронный код. Тема использования потока или нет очень сложна, так как это зависит от языка (способ выполнения кода), системы (один или несколько процессоров) и конкуренции (попытка поделиться ресурсом с блокировкой) и других факторов. , но, как правило, асинхронный код работает быстрее, поскольку он использует больше ресурсов ЦП с меньшими издержками, особенно если вы не связаны с процессором.
Решением является использование модуля select
в Python или чего-то подобного. Он сообщит вам, когда сокет имеет данные для чтения, и вы можете установить параметр тайм-аута.
Вы можете повысить производительность, выполняя асинхронную работу (асинхронные сокеты). Чтобы перевести сокет в асинхронный режим, вы просто вызываете socket.settimeout(0)
, что делает его не блокированным. Однако вы будете постоянно кушать процессор, ожидая данных. Модуль select
и друзья не дадут вам раскрутиться.
Как правило, для повышения производительности вы хотите выполнять как можно больше асинхронных (одинаковых потоков), а затем расширять их с помощью большего количества потоков, которые также выполняют максимально асинхронную работу. Однако, как отмечалось ранее, Python является исключением из этого правила из-за GIL (глобальной блокировки интерпретатора), которая может фактически снизить производительность по сравнению с тем, что я прочитал. Если вам интересно, попробуйте написать тестовый пример и узнайте!
Вы также должны проверить примитивы блокировки потоков в модуле threading
. Это Lock
, RLock
и Condition
. Они могут помочь нескольким потокам обмениваться данными без проблем.
lock = threading.Lock()
def myfunction(arg):
with lock:
arg.do_something()
Некоторые объекты Python являются потокобезопасными, а другие - нет.
Асинхронная отправка обновлений (улучшение)
Вместо использования третьего потокатолько для отправки обновлений вы могли бы вместо этого использовать поток клиента для отправки обновлений, проверяя текущее время и время последней отправки обновления.Это исключило бы использование Queue
и Thread
.Также для этого вы должны преобразовать свой клиентский код в асинхронный код и установить таймаут на select
, чтобы вы могли через некоторое время проверять текущее время, чтобы узнать, требуется ли обновление.
Сводка
Я бы порекомендовал вам переписать свой код с использованием асинхронного кода сокета.Я также рекомендовал бы использовать один поток для всех клиентов и сервера.Это улучшит производительность и уменьшит задержку.Это облегчит отладку, потому что у вас не будет возможных проблем с параллелизмом, как у вас с потоками.Кроме того, исправьте ваш протокол сообщений, прежде чем он не сможет.