Можно ли избежать многопоточного сокета UDP в отбрасываемых данных Python? - PullRequest
3 голосов
/ 16 марта 2010

Прежде всего, я новичок в Python и учусь на работе, так что будьте осторожны!

Я пытаюсь написать потоковое приложение Python для Windows, которое считывает данные из сокета UDP (поток-1), записывает их в файл (поток-2) и отображает текущие данные (поток-3) в виджет (gtk.Image с использованием gtk.gdk.pixbuf). Я использую очереди для обмена данными между потоками.

Моя проблема в том, что, если я запускаю только потоки 1 и 3 (поэтому пока пропускаю запись в файл), кажется, что я теряю некоторые данные после первых нескольких выборок. После этой капли все выглядит хорошо. Даже если завершить поток 1 перед выполнением потока 3, это очевидное падение все еще сохраняется.

Извиняюсь за длину фрагмента кода (я удалил поток, который пишет в файл), но я чувствовал, что удаление кода просто вызовет вопросы. Надеюсь, что кто-то может пролить свет: -)

import socket
import threading
import Queue
import numpy
import gtk
gtk.gdk.threads_init()
import gtk.glade
import pygtk


class readFromUDPSocket(threading.Thread):

    def __init__(self, socketUDP, readDataQueue, packetSize, numScans):
        threading.Thread.__init__(self)
        self.socketUDP = socketUDP
        self.readDataQueue = readDataQueue
        self.packetSize = packetSize
        self.numScans = numScans

    def run(self):
        for scan in range(1, self.numScans + 1):
            buffer = self.socketUDP.recv(self.packetSize)
            self.readDataQueue.put(buffer)
        self.socketUDP.close()
        print 'myServer finished!'


class displayWithGTK(threading.Thread):

    def __init__(self, displayDataQueue, image, viewArea):
        threading.Thread.__init__(self)
        self.displayDataQueue = displayDataQueue
        self.image = image
        self.viewWidth = viewArea[0]
        self.viewHeight = viewArea[1]
        self.displayData = numpy.zeros((self.viewHeight, self.viewWidth, 3), dtype=numpy.uint16)

    def run(self):
        scan = 0
        try:
            while True:
                if not scan % self.viewWidth: scan = 0
                buffer = self.displayDataQueue.get(timeout=0.1)
                self.displayData[:, scan, 0] = numpy.fromstring(buffer, dtype=numpy.uint16)
                self.displayData[:, scan, 1] = numpy.fromstring(buffer, dtype=numpy.uint16)
                self.displayData[:, scan, 2] = numpy.fromstring(buffer, dtype=numpy.uint16)
                gtk.gdk.threads_enter()
                self.myPixbuf = gtk.gdk.pixbuf_new_from_data(self.displayData.tostring(), gtk.gdk.COLORSPACE_RGB,
                                                        False, 8, self.viewWidth, self.viewHeight, self.viewWidth * 3)
                self.image.set_from_pixbuf(self.myPixbuf)
                self.image.show()
                gtk.gdk.threads_leave()
                scan += 1
        except Queue.Empty:
            print 'myDisplay finished!'
            pass


def quitGUI(obj):
    print 'Currently active threads: %s' % threading.enumerate()
    gtk.main_quit()


if __name__ == '__main__':

    # Create socket (IPv4 protocol, datagram (UDP)) and bind to address
    socketUDP = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    host = '192.168.1.5'
    port = 1024
    socketUDP.bind((host, port))

    # Data parameters
    samplesPerScan = 256
    packetsPerSecond = 1200
    packetSize = 512
    duration = 1  # For now, set a fixed duration to log data
    numScans = int(packetsPerSecond * duration)

    # Create array to store data
    data = numpy.zeros((samplesPerScan, numScans), dtype=numpy.uint16)

    # Create queue for displaying from
    readDataQueue = Queue.Queue(numScans)

    # Build GUI from Glade XML file
    builder = gtk.Builder()
    builder.add_from_file('GroundVue.glade')
    window = builder.get_object('mainwindow')
    window.connect('destroy', quitGUI)
    view = builder.get_object('viewport')
    image = gtk.Image()
    view.add(image)
    viewArea = (1200, samplesPerScan)

    # Instantiate & start threads
    myServer = readFromUDPSocket(socketUDP, readDataQueue, packetSize, numScans)
    myDisplay = displayWithGTK(readDataQueue, image, viewArea)

    myServer.start()
    myDisplay.start()

    gtk.gdk.threads_enter()
    gtk.main()
    gtk.gdk.threads_leave()
    print 'gtk.main finished!'

Ответы [ 5 ]

4 голосов
/ 16 марта 2010

UDP не проверяет получателя, который его получил (как это делает TCP) - вы должны реализовать ретрансляцию и тому подобное в своих приложениях, если вы хотите обеспечить получение всех данных. Вы управляете отправляющим источником UDP?

2 голосов
/ 26 марта 2010

UDP по определению ненадежен . Вы не должны писать программы, которые ожидают, что UDP-дейтаграммы будут всегда проходить.

Пакеты все время отбрасываются и в TCP, но вашей программе это не нужно, потому что приложения TCP не могут обрабатывать пакетов ; стек TCP показывает вашему приложению поток байтов. Там есть много механизмов, чтобы убедиться, что если вы отправите байты 'ABCD', вы увидите 'A' 'B' 'C' 'D' в конце. Конечно, вы можете получить любую возможную коллекцию пакетов: «ABC», «D» или «AB», CD »и т. Д. Или вы можете просто увидеть« ABC », а затем ничего.

TCP не «надежен», потому что он может волшебным образом заставить ваши сетевые кабели никогда не выходить из строя или ломаться; гарантия, которую он предоставляет, состоит в том, что до момента, когда поток прерывается, вы будете видеть все в порядке. А после прорыва потока вы ничего не увидите.

В UDP такой гарантии нет. Если вы отправите четыре дейтаграммы UDP, 'AB', 'CD', 'EF' 'GH', вы можете получить все из них, или ни один из них, или половину, или только одну из них. Вы можете получить их в любом порядке. Единственная гарантия, которую пытается предоставить UDP, заключается в том, что вы не увидите сообщение с 'ABCD', поскольку эти байты находятся в разных дейтаграммах.

Подводя итог: это не имеет ничего общего с Python, потоками или GTK. Это просто базовый факт жизни в сетях, основанных на физической реальности: иногда электрические характеристики ваших проводов не способствуют передаче ваших сообщений по ним.

Вы можете уменьшить сложность вашей программы, используя Twisted , в частности, API listenUDP , потому что тогда вам не нужно будет манипулировать потоками или их взаимодействием с помощью GTK: вы можете просто вызывать методы непосредственно в рассматриваемом виджете из вашего datagramReceived метода. Но это не решит основную проблему: UDP просто сбрасывает данные иногда, точка. Реальное решение - убедить ваш источник данных использовать вместо него TCP.

0 голосов
/ 17 марта 2010

Кажется, проблема в источнике.Есть две проблемы:

  1. Глядя на Wireshark, источник не последовательно передает 1200 пакетов в секунду.Возможно, как указал Лен, проблема с удалением данных из стека.Кстати, источником является программируемая карта с портом Ethernet, подключенным к моей машине.

  2. Другая проблема заключается в том, что после первых 15 пакетов или около того данных всегда есть сбрасывание.Я обнаружил, что если я получу 20 пакетов в части инициализации потока readFromUDPSocket, я смогу затем прочитать данные, например,

class readFromUDPSocket(threading.Thread):

    def __init__(self, socketUDP, readDataQueue, packetSize, numScans):
        threading.Thread.__init__(self)
        self.socketUDP = socketUDP
        self.readDataQueue = readDataQueue
        self.packetSize = packetSize
        self.numScans = numScans
        for i in range(0, 20):
            buffer = self.socketUDP.recv(self.packetSize)

    def run(self):
        for scan in range(1, self.numScans + 1):
            buffer = self.socketUDP.recv(self.packetSize)
            self.readDataQueue.put(buffer)
        self.socketUDP.close()
        print 'myServer finished!'

Не уверен, на что это указывает?!Я думаю, что все это исключает возможность быстрого и быстрого восстановления.

0 голосов
/ 17 марта 2010

Во-первых; Вы можете установить размер буфера recv для сокета? Если это так, установите его на что-то очень большое, поскольку это позволит стеку UDP буферизовать больше дейтаграмм для вас.

Во-вторых, если вы можете использовать асинхронный ввод-вывод, то отправляйте сразу несколько вызовов recv (опять же, это позволяет стеку обслуживать больше дейтаграмм, прежде чем он начнет их отбрасывать).

В-третьих; Вы можете попытаться немного развернуть цикл и прочитать несколько дейтаграмм, прежде чем поместить их в свою очередь; может ли блокировка очереди вызывать медленную работу потока recv ??

Наконец; датаграммы могут быть сброшены в другом месте в сети, вы ничего не можете сделать, что U в UDP ...

0 голосов
/ 16 марта 2010

Редактировать - Вычеркнул предложение прослушать / принять, спасибо Дэниел, я только что пришел, чтобы удалить его, когда увидел твой комментарий:)

Я бы предположил, что это проблема сетевого программирования, а не python как таковая.

Вы установили частоту пакетов в секунду и продолжительность, чтобы определить количество вызовов recv, которые вы делаете на свой сокет UDP. Я не вижу вызовов listen или accept в сокет, я предполагаю, что recv обрабатывает, как вы говорите, вы получаете некоторые данные . Вы не упомянули генерацию данных.

Вы определили, сколько операций чтения вы ожидаете сделать, поэтому я предполагаю, что код делает столько же запросов до выхода, поэтому я пришел к выводу, что ваш recv packetSize недостаточен, и поэтому одно чтение не не тянет всю дейтаграмму, затем последующее recv вытягивает следующую часть предыдущей дейтаграммы.

Разве вы не можете посмотреть на полученные данные и определить, чего не хватает? Какие данные вы «теряете»? Откуда ты знаешь, что он потерян?

Кроме того, вы можете использовать wireshark, чтобы убедиться, что ваш хост фактически получает данные одновременно с проверкой размера дейтаграмм. Сопоставьте перехват с данными, которые предоставляет ваш поток recv.


Update

Вы говорите, что теряете данные, но не то, что это. Я вижу две возможности потери данных:

  • Усечение пакетов
  • Отбрасывание пакетов

Вы сказали, что размер полезной нагрузки совпадает с размером, который вы передаете в recv, поэтому я буду считать, что вы не усекаете.

Таким образом, факторы для отбрасывания пакетов - это комбинация скорости приема, скорости чтения из буфера приема и размера буфера приема.

Ваши звонки на Queue.put могут замедлять скорость чтения.

Итак, сначала определите, что вы можете читать 1200 пакетов в секунду, изменив readFromUDPSocket на Queue.put, но не подсчитав количество полученных и полученных отчетов.

Как только вы определили, что можете позвонить recv достаточно быстро, следующим шагом будет выяснение того, что вас тормозит. Я подозреваю, что это может быть использование вами Queue, я предлагаю группировать полезные нагрузки в группах размера N для размещения на Queue, чтобы вы не пытались вызвать put на 12 Гц.

Учитывая, что вы хотите поддерживать скорость 1200 операций чтения в секунду, я не думаю, что вы сильно продвинетесь, увеличив приемный буфер на сокете.

...