Отправка данных через сокет из другого потока не работает в Python - PullRequest
2 голосов
/ 08 июля 2010

Это мой «игровой сервер». Ничего серьезного, я подумал, что это хороший способ узнать кое-что о питоне и сокетах.

Сначала класс сервера инициализировал сервер. Затем, когда кто-то подключается, мы создаем клиентский поток. В этой теме мы постоянно слушаем наш сокет.

Когда приходит определенная команда (например, I12345001001), она порождает другой поток.

Целью этого последнего потока является отправка обновлений клиенту. Но хотя я вижу, что сервер выполняет этот код, данные на самом деле не отправляются.

Кто-нибудь может сказать, где что-то идет не так? Как будто я должен что-то получить, прежде чем смогу отправить. Так что я предполагаю, что где-то мне чего-то не хватает


#!/usr/bin/env python

"""
An echo server that uses threads to handle multiple clients at a time.
Entering any line of input at the terminal will exit the server.
"""

import select
import socket
import sys
import threading
import time
import Queue

globuser = {}
queue = Queue.Queue()

class Server:
    def __init__(self):
        self.host = ''
        self.port = 2000
        self.backlog = 5
        self.size = 1024
        self.server = None
        self.threads = []

    def open_socket(self):
        try:
            self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server.bind((self.host,self.port))
            self.server.listen(5)
        except socket.error, (value,message):
            if self.server:
                self.server.close()
            print "Could not open socket: " + message
            sys.exit(1)

    def run(self):
        self.open_socket()
        input = [self.server,sys.stdin]
        running = 1
        while running:
            inputready,outputready,exceptready = select.select(input,[],[])

            for s in inputready:

                if s == self.server:
                    # handle the server socket
                    c = Client(self.server.accept(), queue)
                    c.start()
                    self.threads.append(c)

                elif s == sys.stdin:
                    # handle standard input
                    junk = sys.stdin.readline()
                    running = 0

        # close all threads

        self.server.close()
        for c in self.threads:
            c.join()

class Client(threading.Thread):
    initialized=0

    def __init__(self,(client,address), queue):
        threading.Thread.__init__(self)
        self.client = client
        self.address = address
        self.size = 1024
        self.queue = queue
        print 'Client thread created!'


    def run(self):
        running = 10
        isdata2=0
        receivedonce=0

        while running > 0:

            if receivedonce == 0:
                print 'Wait for initialisation message'
                data = self.client.recv(self.size)
                receivedonce = 1

            if self.queue.empty():
                print 'Queue is empty'
            else:
                print 'Queue has information'
                data2 = self.queue.get(1, 1)
                isdata2 = 1
                if data2 == 'Exit':
                    running = 0
                    print 'Client is being closed'
                    self.client.close()

            if data:
                print 'Data received through socket! First char: "' + data[0] + '"'
                if data[0] == 'I':
                    print 'Initializing user'
                    user = {'uid': data[1:6] ,'x': data[6:9], 'y': data[9:12]}
                    globuser[user['uid']] = user
                    print globuser
                    initialized=1
                    self.client.send('Beginning - Initialized'+';')
                    m=updateClient(user['uid'], queue)
                    m.start()
                else:
                    print 'Reset receivedonce'
                    receivedonce = 0

                print 'Sending client data'
                self.client.send('Feedback: ' +data+';')
                print 'Client Data sent: ' + data

            data=None

            if isdata2 == 1:
                print 'Data2 received: ' + data2
                self.client.sendall(data2)
                self.queue.task_done()
                isdata2 = 0

            time.sleep(1)
            running = running - 1

        print 'Client has stopped'


class updateClient(threading.Thread):

    def __init__(self,uid, queue):
        threading.Thread.__init__(self)
        self.uid = uid
        self.queue = queue
        global globuser
        print 'updateClient thread started!'

    def run(self):
        running = 20
        test=0
        while running > 0:
            test = test + 1
            self.queue.put('Test Queue Data #' + str(test))
            running = running - 1
            time.sleep(1)

        print 'Updateclient has stopped'


if __name__ == "__main__":
    s = Server()
    s.run() 

Ответы [ 2 ]

3 голосов
/ 08 июля 2010

Я не понимаю вашу логику - в частности, почему вы специально настроили два потока, пишущих одновременно в один и тот же сокет (который они оба называют self.client), без какой-либо синхронизации или координации, договоренности, котораякажется гарантированно вызовет проблемы.

В любом случае, определенная ошибка в вашем коде заключается в том, что вы используете метод send - вы, похоже, уверены, что он гарантирует отправку всей своей строки аргумента, но это определенно не регистр, см. документы :

Возвращает количество отправленных байтов.Приложения отвечают за проверку того, что все данные были отправлены;если были переданы только некоторые данные, приложение должно попытаться доставить оставшиеся данные.

sendall - это метод, который вы, вероятно, хотите:

В отличие от send (), этот метод продолжает отправлять данные из строки до тех пор, пока не будут отправлены все данные или не возникнет ошибка.

Другие проблемы включают в себя тот факт, что updateClient явно предназначен дляникогда не завершать (в отличие от двух других потоковых классов - когда они завершаются, экземпляры updateClient не будут, и они просто будут продолжать работать и поддерживать процесс в рабочем состоянии), избыточные операторы global (безобидные, просто сбивают с толку),некоторые потоки пытаются прочитать диктовку (с помощью метода iteritems), в то время как другие потоки меняют его, опять же без какой-либо блокировки или координации, и т. д., и т. д. - я уверен, что может быть еще больше ошибок или проблем, но послезаметив несколько, глаза, как правило, начинают мерцать; -).

0 голосов
/ 27 сентября 2014

У вас три основные проблемы. Первая проблема, скорее всего, ответ на ваш вопрос.

Блокировка (проблема вопроса)

socket.recv блокируется. Это означает, что выполнение остановлено и поток переходит в спящий режим, пока не сможет прочитать данные из сокета. Таким образом, ваш третий поток обновлений просто заполняет очередь, но он очищается только при получении сообщения. Очередь также очищается одним сообщением за раз.

Вероятно, поэтому он не будет отправлять данные, если вы не отправите данные.

Протокол сообщений в потоке Протокол

Вы пытаетесь использовать поток сокетов как поток сообщений. Я имею в виду, что у вас есть:

 self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

Часть SOCK_STREAM говорит, что это поток, а не сообщение, такое как SOCK_DGRAM. Тем не менее, TCP не поддерживает сообщения. Итак, вам нужно создать такие сообщения, как:

 data =struct.pack('I', len(msg)) + msg
 socket.sendall(data)

Тогда принимающая сторона будет искать поле длины и считывать данные в буфер. Как только в буфере окажется достаточно данных, он может получить сообщение whole .

Ваша текущая настройка работает, потому что ваши сообщения достаточно малы, чтобы все они помещались в один и тот же пакет и вместе помещались в буфер сокетов. Однако, как только вы начнете отправлять большие данные по нескольким вызовам с помощью socket.send или socket.sendall, вы начнете читать несколько сообщений и частичные сообщения, если не внедрите протокол сообщений поверх потока байтов сокета.

Тема

Несмотря на то, что потоки могут быть более просты в использовании при запуске, они имеют много проблем и могут ухудшить производительность при неправильном использовании, особенно в Python. Я люблю темы, поэтому не поймите меня неправильно. В Python также есть проблема с GIL (глобальной блокировкой интерпретатора), поэтому вы получаете плохую производительность при использовании потоков, связанных с процессором. Ваш код в основном связан с вводом / выводом в данный момент, но в будущем он может стать связанным с процессором. Также вам нужно беспокоиться о блокировке с помощью потоков. Нить может быть быстрым, но не лучшим решением. Существуют обстоятельства, когда многопоточность - самый простой способ прервать трудоемкий процесс. Так что не отбрасывайте темы как злые или ужасные. В Python они считаются плохими в основном из-за GIL, а в других языках (включая Python) из-за проблем параллелизма, поэтому большинство людей рекомендуют использовать несколько процессов с Python или использовать асинхронный код. Тема использования потока или нет очень сложна, так как это зависит от языка (способ выполнения кода), системы (один или несколько процессоров) и конкуренции (попытка поделиться ресурсом с блокировкой) и других факторов. , но, как правило, асинхронный код работает быстрее, поскольку он использует больше ресурсов ЦП с меньшими издержками, особенно если вы не связаны с процессором.

Решением является использование модуля select в Python или чего-то подобного. Он сообщит вам, когда сокет имеет данные для чтения, и вы можете установить параметр тайм-аута.

Вы можете повысить производительность, выполняя асинхронную работу (асинхронные сокеты). Чтобы перевести сокет в асинхронный режим, вы просто вызываете socket.settimeout(0), что делает его не блокированным. Однако вы будете постоянно кушать процессор, ожидая данных. Модуль select и друзья не дадут вам раскрутиться.

Как правило, для повышения производительности вы хотите выполнять как можно больше асинхронных (одинаковых потоков), а затем расширять их с помощью большего количества потоков, которые также выполняют максимально асинхронную работу. Однако, как отмечалось ранее, Python является исключением из этого правила из-за GIL (глобальной блокировки интерпретатора), которая может фактически снизить производительность по сравнению с тем, что я прочитал. Если вам интересно, попробуйте написать тестовый пример и узнайте!

Вы также должны проверить примитивы блокировки потоков в модуле threading. Это Lock, RLock и Condition. Они могут помочь нескольким потокам обмениваться данными без проблем.

lock = threading.Lock()
def myfunction(arg):
    with lock:
        arg.do_something()

Некоторые объекты Python являются потокобезопасными, а другие - нет.

Асинхронная отправка обновлений (улучшение)

Вместо использования третьего потокатолько для отправки обновлений вы могли бы вместо этого использовать поток клиента для отправки обновлений, проверяя текущее время и время последней отправки обновления.Это исключило бы использование Queue и Thread.Также для этого вы должны преобразовать свой клиентский код в асинхронный код и установить таймаут на select, чтобы вы могли через некоторое время проверять текущее время, чтобы узнать, требуется ли обновление.

Сводка

Я бы порекомендовал вам переписать свой код с использованием асинхронного кода сокета.Я также рекомендовал бы использовать один поток для всех клиентов и сервера.Это улучшит производительность и уменьшит задержку.Это облегчит отладку, потому что у вас не будет возможных проблем с параллелизмом, как у вас с потоками.Кроме того, исправьте ваш протокол сообщений, прежде чем он не сможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...