Asyncio, как повторно использовать сокет - PullRequest
1 голос
/ 27 апреля 2019

Как бы я повторно использовал сокет для сервера в asyncio?Вместо создания нового соединения для каждого запроса?

Вот мой код;

async def lookup(server, port, query, sema):
    async with sema as sema:
        try:
            reader, writer = await asyncio.open_connection(server, port)
        except:
            return {}
        writer.write(query.encode("ISO-8859-1"))
        await writer.drain()
        data = b""
        while True:
            d = await reader.read(4096)
            if not d:
                break
            data += d
        writer.close()
        data = data.decode("ISO-8859-1")
        return data

Ответы [ 2 ]

2 голосов
/ 27 апреля 2019

Вы просто вызываете сопрограмму asyncio.open_connection(server, port) только один раз и продолжаете использовать устройство чтения и записи (если, конечно, сервер не просто закрывает соединение на их конце).

Я бы сделал это в отдельном объекте диспетчера асинхронного контекста для ваших соединений и использовал бы пул соединений для управления соединениями, чтобы вы могли создавать и повторно использовать сокет соединения для многих одновременных задач. Используя (асинхронный) менеджер контекста, Python обязательно уведомляет соединение, когда ваш код завершает работу с ним, чтобы соединение могло быть возвращено обратно в пул:

import asyncio
import contextlib

from collections import OrderedDict
from types import TracebackType
from typing import Any, List, Optional, Tuple, Type


try:  # Python 3.7
    base = contextlib.AbstractAsyncContextManager
except AttributeError:
    base = object  # type: ignore

Server = str
Port = int
Host = Tuple[Server, Port]


class ConnectionPool(base):
    def __init__(
        self,
        max_connections: int = 1000,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    ):
        self.max_connections = max_connections
        self._loop = loop or asyncio.get_event_loop()

        self._connections: OrderedDict[Host, List["Connection"]] = OrderedDict()
        self._semaphore = asyncio.Semaphore(max_connections)

    async def connect(self, server: Server, port: Port) -> "Connection":
        host = (server, port)

        # enforce the connection limit, releasing connections notifies
        # the semaphore to release here
        await self._semaphore.acquire()

        connections = self._connections.setdefault(host, [])
        # find an un-used connection for this host
        connection = next((conn for conn in connections if not conn.in_use), None)
        if connection is None:
            # disconnect the least-recently-used un-used connection to make space
            # for a new connection. There will be at least one.
            for conns_per_host in reversed(self._connections.values()):
                for conn in conns_per_host:
                    if not conn.in_use:
                        await conn.close()
                        break

            reader, writer = await asyncio.open_connection(server, port)
            connection = Connection(self, host, reader, writer)
            connections.append(connection)

        connection.in_use = True
        # move current host to the front as most-recently used
        self._connections.move_to_end(host, False)

        return connection

    async def close(self):
        """Close all connections"""
        connections = [c for cs in self._connections.values() for c in cs]
        self._connections = OrderedDict()
        for connection in connections:
            await connection.close()

    def _remove(self, connection):
        conns_for_host = self._connections.get(connection._host)
        if not conns_for_host:
            return
        conns_for_host[:] = [c for c in conns_for_host if c != connection]

    def _notify_release(self):
        self._semaphore.release()

    async def __aenter__(self) -> "ConnectionPool":
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc: Optional[BaseException],
        tb: Optional[TracebackType],
    ) -> None:
        await self.close()

    def __del__(self) -> None:
        connections = [repr(c) for cs in self._connections.values() for c in cs]
        if not connections:
            return

        context = {
            "pool": self,
            "connections": connections,
            "message": "Unclosed connection pool",
        }
        self._loop.call_exception_handler(context)


class Connection(base):
    def __init__(
        self,
        pool: ConnectionPool,
        host: Host,
        reader: asyncio.StreamReader,
        writer: asyncio.StreamWriter,
    ):
        self._host = host
        self._pool = pool
        self._reader = reader
        self._writer = writer
        self._closed = False
        self.in_use = False

    def __repr__(self):
        host = f"{self._host[0]}:{self._host[1]}"
        return f"Connection<{host}>"

    @property
    def closed(self):
        return self._closed

    def release(self) -> None:
        self.in_use = False
        self._pool._notify_release()

    async def close(self) -> None:
        if self._closed:
            return
        self._closed = True
        self._writer.close()
        self._pool._remove(self)
        try:
            await self._writer.wait_closed()
        except AttributeError:  # wait_closed is new in 3.7
            pass

    def __getattr__(self, name: str) -> Any:
        """All unknown attributes are delegated to the reader and writer"""
        if self._closed or not self.in_use:
            raise ValueError("Can't use a closed or unacquired connection")
        if hasattr(self._reader, name):
            return getattr(self._reader, name)
        return getattr(self._writer, name)

    async def __aenter__(self) -> "Connection":
        if self._closed or not self.in_use:
            raise ValueError("Can't use a closed or unacquired connection")
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc: Optional[BaseException],
        tb: Optional[TracebackType],
    ) -> None:
        self.release()

    def __del__(self) -> None:
        if self._closed:
            return
        context = {"connection": self, "message": "Unclosed connection"}
        self._pool._loop.call_exception_handler(context)

затем передайте объект пула вашей поисковой сопрограмме; объект соединения создает прокси для частей чтения и записи:

async def lookup(pool, server, port, query):
    try:
        conn = await pool.connect(server, port)
    except (ValueError, OSError):
        return {}

    async with conn:
        conn.write(query.encode("ISO-8859-1"))
        await conn.drain()
        data = b""
        while True:
            d = await conn.read(4096)
            if not d:
                break
            data += d
        data = data.decode("ISO-8859-1")
        return data

Обратите внимание, что стандартный протокол WHOIS (RFC 3912 или предшественники ) устанавливает, что соединение закрывается после каждого запроса. Если вы подключаетесь к стандартной службе WHOIS через порт 43, нет смысла используя сокеты.

В этом случае происходит то, что считыватель достигнет EOF (reader.at_eof() верно), и любые дальнейшие попытки чтения просто ничего не будут возвращать (reader.read(...) всегда будет возвращать пустое значение b''). Запись в устройство записи не будет ошибкой, пока соединение сокета не будет прервано удаленной стороной после истечения времени ожидания. Вы можете написать все, что хотите, для подключения, но сервер WHOIS будет просто игнорировать запросы.

1 голос
/ 27 апреля 2019

Вы можете создать кэш соединений, сохраняя пары считыватель / записывающее устройство в глобальном словаре.

# at top-level
connections = {}

Затем в lookup замените вызов на open_connection кодом, который сначала проверяет диктовку.:

if (server, port) in connections:
    reader, writer = connections[server, port]
else:
    reader, writer = await asyncio.open_connection(server, port)
    connections[server, port] = reader, writer
...