Получение UDP-сообщения при обработке очереди - PullRequest
0 голосов
/ 13 декабря 2018

В попытке решить это я пытаюсь упростить проблему.Предположим, у меня есть получатель, который прослушивает как TCP, так и UDP сообщения.Он получит несколько строк, добавит их в deque и после получения сообщения "finish" начнет обработку deque.

Если я получу сообщение UDP, мне нужно остановить обработку, удалить последний элементof deque и затем продолжите обработку.

from collections import deque

host = commands.getoutput("hostname -I")
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()


def read_tcp(s):
    conn, addr = s.accept()
    print('Connected with', *addr)
    while 1:
        data = conn.recv(BUFFER_SIZE)
        if not data: break
        print "received data:", data
        conn.send(data)  # echo
    conn.close()
    if (data == 'finish'):
        processP(q)
    else:
        q.append(data)

def read_udp(s):
    data,addr = s.recvfrom(1024)
    print("received message:", data)
    del q[-1]


processP(q):
    text = q.popleft()
    textReverse = text[::-1]
    print(textReverse)

def run():
    # create tcp socket
    tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        tcp.bind((host,port))
    except socket.error as err:
        print('Bind failed', err)
        return
    tcp.listen(1)
    # create udp socket
    udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
    udp.bind((host,port))
    print('***Socket now listening at***:', host, port)
    input = [tcp,udp]
    try:
        while True:
            inputready,outputready,exceptready = select(input,[],[])
            for s in inputready:
                if s == tcp:
                    read_tcp(s)
                elif s == udp:
                    read_udp(s)
                else:
                    print("unknown socket:", s)
    # Hit Break / Ctrl-C to exit
    except KeyboardInterrupt:
        print('\nClosing')
        raise
    tcp.close()
    udp.close()

if __name__ == '__main__':
    run()

У меня проблема в , приостановившей программу после получения UDP-сообщения и затем возвращающейся к фазе обработки .Прямо сейчас, если во время обработки в мою программу будет отправлено UDP-сообщение, оно не получит сообщение до конца обработки (и тогда очередь пуста).Я подумал, что может помочь многопоточность или многопроцессорность, но я не могу понять, как применить их к коду.

1 Ответ

0 голосов
/ 13 декабря 2018

Никто не заставляет вас опустошать очередь.Вы можете проверить, пришло ли UDP-сообщение, перед тем как снять очередную рабочую нагрузку.И это настолько, насколько вы можете получить с потоками, так как они не позволяют вам прерывать произвольный код.Они всегда могут быть прекращены только совместно.

Если обработка вашего отдельного элемента занимает слишком много времени, тогда можно выбрать многопроцессорную обработку рабочих элементов, поскольку вы можете убить внешний процесс.

Использовать select.select дляпроверьте входящие данные на ваших сокетах с коротким таймаутом, прежде чем продолжить обработку следующей рабочей нагрузки.В качестве альтернативы вы можете использовать поток, ожидающий ввода в потоке, и манипулировать удалением.

EDIT Это ваш код, созданный для работы с python3, select.select и таймаутом.Запуск read_udp работает с netcat с echo foo | nc -4 -u localhost 5005, но затем вызывает исключение, потому что вы предполагаете существование элементов в очереди - это проблема логики приложения, которая не зависит от вопроса, как чередовать прослушивание и работу.

import socket
import select
from collections import deque

host = "localhost"
port = 5005
backlog = 5
BUFSIZE = 4096
q = deque()


def read_tcp(s):
    conn, addr = s.accept()
    print('Connected with', *addr)
    while 1:
        data = conn.recv(BUFFER_SIZE)
        if not data: break
        print("received data:", data)
        conn.send(data)  # echo
    conn.close()
    if (data == 'finish'):
        processP(q)
    else:
        q.append(data)

def read_udp(s):
    data,addr = s.recvfrom(1024)
    print("received message:", data)
    del q[-1]


def processP(q):
    text = q.popleft()
    textReverse = text[::-1]
    print(textReverse)

def run():
    # create tcp socket
    tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        tcp.bind((host,port))
    except socket.error as err:
        print('Bind failed', err)
        return
    tcp.listen(1)
    # create udp socket
    udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
    udp.bind((host,port))
    print('***Socket now listening at***:', host, port)
    input = [tcp,udp]
    try:
        while True:
            print("select.select")
            inputready,outputready,exceptready = select.select(input,[],[], 0.1)
            for s in inputready:
                if s == tcp:
                    read_tcp(s)
                elif s == udp:
                    read_udp(s)
                else:
                    print("unknown socket:", s)
    # Hit Break / Ctrl-C to exit
    except KeyboardInterrupt:
        print('\nClosing')
        raise
    tcp.close()
    udp.close()

if __name__ == '__main__':
    run()
...