Пул потоков зависает на json.loads при отправке нескольких объектов json - PullRequest
0 голосов
/ 27 августа 2018

В настоящее время я делаю простой сервер с использованием Python 3.x.Я использую пул потоков, поскольку в любой момент времени может быть подключено большое количество клиентов.Клиент отправляет объект json на сервер, где он затем анализируется и обрабатывается.Однако, когда я отправляю несколько объектов json с помощью sendall с клиента, сервер зависает и не будет обрабатывать json.loads в функции handle_client.

Это всего лишь пример, так как я учусь использовать сокетыи потоки потоков в питоне.В конце концов мне понадобятся постоянные соединения.Любое объяснение, почему оно зависает, высоко ценится.Спасибо!

Сервер

import sys, socket, threading, json, time, concurrent.futures

HOST = socket.gethostbyname(socket.gethostname())
PORT = 65000
TIMEOUT = 5
MAX_CLIENTS = 5
BUFFER_SIZE = 1024
ENCODING_TYPE = 'utf-8'

clients = []

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((HOST, PORT))
server.settimeout(TIMEOUT)
server.setblocking(0)
server.listen(MAX_CLIENTS)
print("Server started successfully")

Pool = concurrent.futures.ThreadPoolExecutor(max_workers=5)

def handle_client(client):
    if client not in clients:
        clients.append(client)
        print('%s clients connected.' % len(clients))
    while True:
        data = client.recv(BUFFER_SIZE).decode(ENCODING_TYPE)
        if data:
            jsonObj = json.loads(data)
            client.sendall('Keep up the great work!'.encode(ENCODING_TYPE))
        else:
            clients.remove(client)
            client.close()
        print('%(name)s is a %(occupation)s!' % {'name': jsonObj['name'], 'occupation': jsonObj['occupation']})

while True:
    try:
        client, addr = server.accept()
        client.setblocking(0)
        Pool.submit(handle_client, client)
        Pool.shutdown(wait=False)
    except BlockingIOError:
        pass

server.close()
print('Server shutdown')
sys.exit()

Клиент

import sys, socket, json, time

HOST = socket.gethostbyname(socket.gethostname())
PORT = 65000
BUFFER_SIZE = 1024
ENCODING_TYPE = 'utf-8'

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((HOST, PORT))

data1 = {
        'name': 'John Doe',
        'age': 23,
        'occupation': 'QA Engineer',
        'employer': 'Samsung'
    }
data2 = {
        'name': 'Jane Roe',
        'age': 32,
        'occupation': 'HR Representative',
        'employer': 'Samsung'
    }
packet1 = json.dumps(data1)
packet2 = json.dumps(data2)

client.sendall(packet1.encode(ENCODING_TYPE))
client.sendall(packet2.encode(ENCODING_TYPE))

while True:
    response = client.recv(BUFFER_SIZE).decode(ENCODING_TYPE)
    print(response)

client.close()

print('Client terminated')

1 Ответ

0 голосов
/ 27 августа 2018

Здесь есть ряд проблем, но я думаю, что ваши проблемы начинаются с этой строки:

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

Важно понимать, что означает эта строка.Вы создаете сокет TCP .TCP является «ориентированным на поток» протоколом.Это означает, что у него нет понятия «пакеты».send() и sendall() вставляют байты в сокет, а recv() вытаскивает байты из него.После того, как байты были вставлены в сокет, TCP не предпринимает попыток отследить, «из какого send()» был получен байт.

Если a и b оба являются bytes объектами, sendall(a); sendall(b) в точности соответствует sendall(a+b) (существуют значительные различия в производительности, которые я намеренно игнорирую).Точно так же recv(1024) означает «получить следующие 1024 байта из сокета или дать мне меньше, если у вас еще нет 1024 байтов».Если одноранговый узел сделал две посылки подряд, то recv может объединять их вместе или не объединять, в зависимости от времени (и других факторов, таких как алгоритм Нейгла ).Точно так же очень большая передача может быть разделена на несколько recvs (даже если размер буфера достаточно большой).

В общем, при разработке протокола поверх TCP вы должны позаботиться об определенных битах«документы» для обеспечения бесперебойной работы:

  • Большинство протоколов имеют понятие «сообщения».Например, HTTP имеет запросов и ответов , а SMTP и FTP имеют команды.В вашем протоколе клиент отправляет объекты JSON, а сервер отправляет фиксированную строку;для вас «сообщением» может быть либо один объект JSON, либо один экземпляр этой фиксированной строки.
  • Обычно каждый узел чередует отправку и получение сообщений до тех пор, пока соединение не будет закрыто, хотя возможны более сложные соглашения.
    • Если какой-либо одноранговый узел должен получать несколько сообщений подряд, он должен иметь возможность определить, где заканчивается одно сообщение и начинается следующее, предпочтительно без большого количества сложных манипуляций со строками.Вот почему многие протоколы предпочитают чередовать.
  • Во время приема одноранговый узел должен убедиться, что он получил все сообщение перед отправкой чего-либо (иначе другой одноранговый узел отправит больше данных, итеперь вы вернулись к проблеме «найти, где заканчивается сообщение»).Для этого может потребоваться объединение результатов нескольких вызовов в recv().Одноранговый узел должен быть готов к изящной обработке частичных сообщений, например:
    • Возможно (если в некоторой степени маловероятно) разделение многобайтового символа UTF-8.Такая последовательность байтов не сможет .decode() с UnicodeError.
    • . Возможно, JSON-объект будет разделен.Такая последовательность символов завершится с ошибкой в ​​json.loads() с ValueError.
  • С неблокирующими сокетами недостаточно вызвать recv, пока не произойдет сбой с BlockingIOError,потому что более поздние пакеты могут быть задержаны или намеренно задержаны по соображениям производительности (небольшие пакеты относительно дороги).Принимающий узел должен фактически проверить полученные данные и проверить, является ли оно полным сообщением.

Наконец, вот несколько других замечаний, которые у меня есть относительно кода вашего сервера:

  • Я не уверен, что делает handle_client().Похоже, он попадает в бесконечный цикл, в котором он многократно .close() с сокета клиента, а затем пытается все равно извлечь из него снова.Это недопустимо.
  • Точно так же ваш код верхнего уровня несколько раз .shutdown() s пул, затем пытается отправить больше задач, что также недопустимо.
  • Если вы используете неблокирующий I /О, вам, вероятно, также необходимо позвонить по номеру select() или poll(), чтобы определить, готовы ли клиенты к чтению.Поскольку ваш код в настоящее время стоит, вы отказываетесь от них.
...