как реализовать обратный прокси с поддержкой websocket с помощью aiohttp (python 3.6) - PullRequest
0 голосов
/ 18 сентября 2018

Я пытаюсь реализовать специальный обратный прокси-сервер для ноутбуков Jupyter с помощью aiohttp. Он отлично работает для запросов http, но переадресация websocket не работает. Запросы от браузера приходят и перенаправляются, но от jupyter нет никаких ответов. Я предполагаю, что мой клиентский код websocket как-то не реагирует на входящие сообщения от jupyter.

Единственным указанием на стороне юпитера, что что-то не так, являются сообщения, подобные этому:

WebSocket ping timeout after 90009 ms.

так вот моя попытка написания прокси

from aiohttp import web
from aiohttp import client
import aiohttp
import logging
import pprint

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


baseUrl = 'http://0.0.0.0:8888'
mountPoint = '/fakeUuid'

async def handler(req):
    proxyPath = req.match_info.get('proxyPath','no proxyPath placeholder defined')
    reqH = req.headers.copy()
    if reqH['connection'] == 'Upgrade' and reqH['upgrade'] == 'websocket' and req.method == 'GET':

      ws_server = web.WebSocketResponse()
      await ws_server.prepare(req)
      logger.info('##### WS_SERVER %s' % pprint.pformat(ws_server))

      client_session = aiohttp.ClientSession()
      async with client_session.ws_connect(baseUrl+req.path_qs,
        headers = { 'cookie': reqH['cookie'] },
      ) as ws_client:
        logger.info('##### WS_CLIENT %s' % pprint.pformat(ws_client))

        async for server_msg in ws_server:
          logger.info('>>> msg from browser: %s',pprint.pformat(server_msg))
          if server_msg.type == aiohttp.WSMsgType.TEXT:
            await ws_client.send_str(server_msg.data)
          else:
            await ws_client.send_bytes(server_msg.data)

        async for client_msg in ws_client:
          logger.info('>>> msg from jupyter: %s',pprint.pformat(client_msg))
          if client_msg.tp == aiohttp.WSMsgType.TEXT:
            await ws_server.send_str(client_msg.data)
          else:
            await ws_server.send_bytes(client_msg.data)

        return ws_server
    else:
      async with client.request(
          req.method,baseUrl+mountPoint+proxyPath,
          headers = reqH,
          allow_redirects=False,
          data = await req.read()
      ) as res:
          headers = res.headers.copy()
          body = await res.read()
          return web.Response(
            headers = headers,
            status = res.status,
            body = body
          )
      return ws_server

app = web.Application()
app.router.add_route('*',mountPoint + '{proxyPath:.*}', handler)
web.run_app(app,port=3984)

1 Ответ

0 голосов
/ 19 сентября 2018

Извлеченный урок: два async for блокируют поток текущей функции.Запустив их с asyncio.wait, я могу заставить их работать одновременно.Результирующая программа выглядит так:

from aiohttp import web
from aiohttp import client
import aiohttp
import asyncio
import logging
import pprint

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


baseUrl = 'http://0.0.0.0:8888'
mountPoint = '/fakeUuid'

async def handler(req):
    proxyPath = req.match_info.get('proxyPath','no proxyPath placeholder defined')
    reqH = req.headers.copy()
    if reqH['connection'] == 'Upgrade' and reqH['upgrade'] == 'websocket' and req.method == 'GET':

      ws_server = web.WebSocketResponse()
      await ws_server.prepare(req)
      logger.info('##### WS_SERVER %s' % pprint.pformat(ws_server))

      client_session = aiohttp.ClientSession(cookies=req.cookies)
      async with client_session.ws_connect(
        baseUrl+req.path_qs,
      },
      ) as ws_client:
        logger.info('##### WS_CLIENT %s' % pprint.pformat(ws_client))

        async def wsforward(ws_from,ws_to):
          async for msg in ws_from:
            logger.info('>>> msg: %s',pprint.pformat(msg))
            mt = msg.type
            md = msg.data
            if mt == aiohttp.WSMsgType.TEXT:
              await ws_to.send_str(md)
            elif mt == aiohttp.WSMsgType.BINARY:
              await ws_to.send_bytes(md)
            elif mt == aiohttp.WSMsgType.PING:
              await ws_to.ping()
            elif mt == aiohttp.WSMsgType.PONG:
              await ws_to.pong()
            elif ws_to.closed:
              await ws_to.close(code=ws_to.close_code,message=msg.extra)
            else:
              raise ValueError('unexpecte message type: %s',pprint.pformat(msg))

        finished,unfinished = await asyncio.wait([wsforward(ws_server,ws_client),wsforward(ws_client,ws_server)],return_when=asyncio.FIRST_COMPLETED)

        return ws_server
    else:
      async with client.request(
          req.method,baseUrl+mountPoint+proxyPath,
          headers = reqH,
          allow_redirects=False,
          data = await req.read()
      ) as res:
          headers = res.headers.copy()
          body = await res.read()
          return web.Response(
            headers = headers,
            status = res.status,
            body = body
          )
      return ws_server

app = web.Application()
app.router.add_route('*',mountPoint + '{proxyPath:.*}', handler)
web.run_app(app,port=3984)
...