Никто не заставляет вас опустошать очередь.Вы можете проверить, пришло ли UDP-сообщение, перед тем как снять очередную рабочую нагрузку.И это настолько, насколько вы можете получить с потоками, так как они не позволяют вам прерывать произвольный код.Они всегда могут быть прекращены только совместно.
Если обработка вашего отдельного элемента занимает слишком много времени, тогда можно выбрать многопроцессорную обработку рабочих элементов, поскольку вы можете убить внешний процесс.
Использовать select.select
дляпроверьте входящие данные на ваших сокетах с коротким таймаутом, прежде чем продолжить обработку следующей рабочей нагрузки.В качестве альтернативы вы можете использовать поток, ожидающий ввода в потоке, и манипулировать удалением.
EDIT Это ваш код, созданный для работы с python3, select.select и таймаутом.Запуск read_udp работает с netcat с echo foo | nc -4 -u localhost 5005
, но затем вызывает исключение, потому что вы предполагаете существование элементов в очереди - это проблема логики приложения, которая не зависит от вопроса, как чередовать прослушивание и работу.
import socket
import select
from collections import deque
host = "localhost"
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()
def read_tcp(s):
conn, addr = s.accept()
print('Connected with', *addr)
while 1:
data = conn.recv(BUFFER_SIZE)
if not data: break
print("received data:", data)
conn.send(data) # echo
conn.close()
if (data == 'finish'):
processP(q)
else:
q.append(data)
def read_udp(s):
data,addr = s.recvfrom(1024)
print("received message:", data)
del q[-1]
def processP(q):
text = q.popleft()
textReverse = text[::-1]
print(textReverse)
def run():
# create tcp socket
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
tcp.bind((host,port))
except socket.error as err:
print('Bind failed', err)
return
tcp.listen(1)
# create udp socket
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
udp.bind((host,port))
print('***Socket now listening at***:', host, port)
input = [tcp,udp]
try:
while True:
print("select.select")
inputready,outputready,exceptready = select.select(input,[],[], 0.1)
for s in inputready:
if s == tcp:
read_tcp(s)
elif s == udp:
read_udp(s)
else:
print("unknown socket:", s)
# Hit Break / Ctrl-C to exit
except KeyboardInterrupt:
print('\nClosing')
raise
tcp.close()
udp.close()
if __name__ == '__main__':
run()