Получение данных из задачи в соединение с Asyncio - PullRequest
0 голосов
/ 26 апреля 2018

Так что это должно быть довольно просто, у меня есть клиент, который подключается к серверу, и он может нормально получать сообщения с сервера.Однако мне нужно иметь возможность отправлять сообщения на сервер.Я использую Asyncio для обработки этих асинхронно, но у меня есть проблема с этим.Как получить пользовательский ввод для моего клиента, чтобы он мог использовать свой транспорт для отправки данных на сервер.Вот код.

import asyncio, zen_utils, json
from struct import *

class ChatClient(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.data = b''
        print('Accepted connection from {}'.format(self.address))

        self.username = b'jacksonm'
        json_name = b'{"USERNAME": "'+self.username+b'"}'
        length = len(json_name)
        code_len = pack(b'!I', length)
        message = code_len + json_name
        self.json_loaded = False
        self.next_length = -1
        self.transport.write(message)

    def data_received(self, data):
        self.data += data

        if (self.json_loaded == False):
            self.compile_server_data(data)
        elif(self.json_loaded):
            if (len(self.data) > 4 and self.next_length == -1):
                self.next_length = self.data[:4]
                self.next_length = unpack(b'!I', self.next_length)[0]
                print("LENGTH: ", self.next_length)

            elif (len(self.data) >= self.next_length):
                self.data = self.data[4:]
                print("MESSAGE: ", self.data)
                self.next_length = -1

    def compile_server_data(self, data):
        if (self.data.find(b'}') != -1):
            start_index = self.data.find(b'{')
            end_index = self.data.find(b'}')

            self.json_data = self.data[start_index:end_index + 1]
            self.data = self.data[end_index + 1:]

            self.json_data = self.json_data.decode('ascii')
            self.json_data = json.loads(self.json_data)
            self.json_loaded = True

            self.print_server_status()

    def send_message(self, message):
        message = message.encode('ascii')
        length = len(message)
        code_len = pack(b'!I', length)
        message = code_len + message

    def parse_message(self, raw_message):
        message = {}
        message['SRC'] = self.username
        message['DEST'] = b'ALL'
        message['TIMESTAMP'] = int(time.time())
        message['CONTENT'] = b'test_message'

        json_message = json.loads(message)
        print (json_message)

    def print_server_status(self):
        print ("USERS:")
        for user in self.json_data["USER_LIST"]:
            print(user)
        print()
        print("MESSAGES:")
        for message in self.json_data["MESSAGES"][-10:]:
            print ("From: ", message[0], "     ", "To: ", message[1])
            print ("Message: ", message[3])
            print()

    def get_inital_data(self):
        pass

    def connection_lost(self, exc):
        if exc:
            print('Client {} error: {}'.format(self.address, exc))
        elif self.data:
            print('Client {} sent {} but then closed'
                  .format(self.address, self.data))
        else:
            print('Client {} closed socket'.format(self.address))

@asyncio.coroutine
def handle_user_input(loop):
    """reads from stdin in separate thread

    if user inputs 'quit' stops the event loop
    otherwise just echos user input
    """
    while True:
        message = yield from loop.run_in_executor(None, input, "> ")
        if message == "quit":
            loop.stop()
            return
        print(message)

if __name__ == '__main__':
    address = zen_utils.parse_command_line('asyncio server using callbacks')
    loop = asyncio.get_event_loop()
    coro = loop.create_connection(ChatClient, *address)
    server = loop.run_until_complete(coro)

    # Start a task which reads from standard input
    asyncio.async(handle_user_input(loop))

    print('Listening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        server.close()
        loop.close()

1 Ответ

0 голосов
/ 26 апреля 2018

Это работает в моих тестах, меняя одну строку, используя Python 3.4:

Измените вызов run_until_complete(), чтобы получить пару (transport,protocol) вместо одного значения.

В частностиизмените server = loop.run_until_complete(coro) на transport, protocol = loop.run_until_complete(coro), потому что вызов выполняет сопрограмму и возвращает ее значение, которое представляет собой пару элементов.Первый элемент - это объект сервера, а вторая запись - это объект протокола, который является экземпляром ChatClient.

Запустите сервер, а затем клиента в разных командных окнах.Код клиента:

import asyncio, json #, zen_utils
from struct import *

class ChatClient(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.data = b''
        print('Accepted connection from {}'.format(self.address))

    def data_received(self, data):
        pass

    def compile_server_data(self, data):
        pass

    def send_message(self, message):
        message = message.encode('ascii')
        self.transport.write(message)

    def parse_message(self, raw_message):
        pass

    def print_server_status(self):
        pass

    def get_inital_data(self):
        pass

    def connection_lost(self, exc):
        if exc:
            print('Client {} error: {}'.format(self.address, exc))
        elif self.data:
            print('Client {} sent {} but then closed'
                  .format(self.address, self.data))
        else:
            print('Client {} closed socket'.format(self.address))

@asyncio.coroutine
def handle_user_input(loop, protocol):
    """reads from stdin in separate thread

    if user inputs 'quit' stops the event loop
    otherwise just echos user input
    """
    while True:
        message = yield from loop.run_in_executor(None, input, "> ")
        if message == "quit":
            loop.stop()
            return
        print(message)
        protocol.send_message(message)

if __name__ == '__main__':
    address = "127.0.0.1"
    port = 82

    loop = asyncio.get_event_loop()
    coro = loop.create_connection(ChatClient, address, port) #*address
    transport, protocol = loop.run_until_complete(coro)

    # Start a task which reads from standard input
    asyncio.async(handle_user_input(loop,protocol))

    print('Listening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        transport.close()
        loop.close()

Код сервера, кредит переходит на этот сайт :

import socket

host = ''        # Symbolic name meaning all available interfaces
port = 82     # Arbitrary non-privileged port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
conn, addr = s.accept()
print('Connected by', addr)
while True:
    data = conn.recv(1024)
    if not data: break

    print( 'Received ', repr(data) )
    if repr(data)=="stop": break

    conn.sendall(data)
conn.close()
...