Балерина: множественная ошибка веб-сокета при обратном вызове - PullRequest
1 голос
/ 28 октября 2019

Я запрограммировал прокси-сервер с помощью балерины. Этот прокси получает запросы от клиентов через веб-сокет. Цель состоит в том, чтобы сервер Ballerina собирал этот запрос и отправлял его на сервер с именем «EXTRACTOR». Этот сервер обрабатывает запрос и возвращает ответ серверу Ballerina. В это время сервер Ballerina не возвращает ответ клиенту, который сделал запрос, но отправляет ответ «EXTRACTOR» на другой сервер с именем «EXTRACTOR2», который обработает ответ «EXTRACTOR». Позже «EXTRACTOR2» возвращает обработанную информацию на сервер Ballerina, а эта возвращает ее клиенту, который сделал запрос в первом случае. Все общение между Ballerina, клиентами, EXTRACTOR и EXTRACTOR2 осуществляется через веб-розетки. Проблема заключается в том, что когда EXTRACTOR2 пытается вернуть информацию на сервер Балерины, он обнаруживает, что не может этого сделать, поскольку связь была закрыта.

Это мой код:

import ballerina/http;
import ballerina/log;


final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://localhost:9090/basic";
final string EXTRACTOR2 = "ws://localhost:9091/basic";



@http:WebSocketServiceConfig {
    path: "/api/ws"
}

service RequestService on new http:Listener(9092) {

  resource function onOpen(http:WebSocketCaller caller) {

        http:WebSocketClient wsClientEp = new(
            EXTRACTOR,
            {callbackService: ClientService1,
            readyOnConnect: false,
            maxFrameSize: 2147483648
        });
        http:WebSocketClient wsClientEp2 = new(
            EXTRACTOR2,
            {callbackService: ClientService2,
            readyOnConnect: false,
            maxFrameSize: 2147483648
        });

        caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);
        wsClientEp.setAttribute(ASSOCIATED_CONNECTION, wsClientEp2);
        wsClientEp2.setAttribute(ASSOCIATED_CONNECTION, caller);

        var err = wsClientEp->ready();
        if (err is http:WebSocketError) {
            log:printError("Error calling ready on client 1", err);
        }

        err = wsClientEp2->ready();
        if (err is http:WebSocketError) {
            log:printError("Error calling ready on client 2", err);
        }
    }

    resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {

        http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
        var err = clientEp->pushText(text, finalFrame);
        if (err is http:WebSocketError) {
            log:printError("Error occurred when sending text message", err);
        }
    }


    resource function onError(http:WebSocketCaller caller, error err) {

       http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
       var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
       if (e is http:WebSocketError) {
           log:printError("Error occurred when closing the connection", e);
       }
       _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
       log:printError("Unexpected error hence closing the connection", err);
   }

   resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {

       http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
       var err = clientEp->close(statusCode = statusCode, reason = reason);
       if (err is http:WebSocketError) {
           log:printError("Error occurred when closing the connection", err);
       }
       _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
   }
}

service ClientService1 = @http:WebSocketServiceConfig {} service {

    resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
        http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
        var err = clientEp2->pushText(text, finalFrame);
        if (err is http:WebSocketError) {
            log:printError("Error occurred when sending text message", err);
        }
    }

    resource function onError(http:WebSocketClient caller, error err) {
        http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
        var e = clientEp2->close(statusCode = 1011, reason = "Unexpected condition");
        if (e is http:WebSocketError) {
            log:printError("Error occurred when closing the connection", err = e);
        }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
        log:printError("Unexpected error hense closing the connection", err);
    }

    resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
        http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
        var err = clientEp2->close(statusCode = statusCode, reason = reason);
            if (err is http:WebSocketError) {
                log:printError("Error occurred when closing the connection", err);
            }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
    }
};

service ClientService2 = @http:WebSocketServiceConfig {} service {

    resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var err = serverEp->pushText(text, finalFrame);
        if (err is http:WebSocketError) {
            log:printError("Error occurred when sending text message", err);
        }
    }

    resource function onError(http:WebSocketClient caller, error err) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
        if (e is http:WebSocketError) {
            log:printError("Error occurred when closing the connection", err = e);
        }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
        log:printError("Unexpected error hense closing the connection", err);
    }

    resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var err = serverEp->close(statusCode = statusCode, reason = reason);
            if (err is http:WebSocketError) {
                log:printError("Error occurred when closing the connection", err);
            }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
    }
};

function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
    http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
    return wsClient;
}

function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
    http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
    return wsEndpoint;
}

function getAssociatedClientEndpointFromClient(http:WebSocketClient ep) returns (http:WebSocketClient) {
    http:WebSocketClient wsEndpoint = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
    return wsEndpoint;
}

Когда EXTRACTOR2 пытается вернуть информацию на сервер Ballerina, появляется эта ошибка:

Error in connection handler
Traceback (most recent call last):
  File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 795, in transfer_data
    message = await self.read_message()
  File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 863, in read_message
    frame = await self.read_data_frame(max_size=self.max_size)
  File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 938, in read_data_frame
    frame = await self.read_frame(max_size)
  File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 1018, in read_frame
    extensions=self.extensions,
  File "/home/cluster/.local/lib/python3.7/site-packages/websockets/framing.py", line 121, in read
    data = await reader(2)
  File "/usr/lib/python3.7/asyncio/streams.py", line 677, in readexactly
    raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 2 expected bytes

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/cluster/.local/lib/python3.7/site-packages/websockets/server.py", line 195, in handler
    await self.ws_handler(self, path)
  File "analyzer_main.py", line 39, in server
    await websocket.send(json.dumps(response))
  File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 530, in send
    await self.ensure_open()
  File "/home/cluster/.local/lib/python3.7/site-packages/websockets/protocol.py", line 771, in ensure_open
    raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: code = 1006 (connection closed abnormally [internal]), no reason
Exception found
{"ok": 0, "data": "[ANALYZER] <class 'websockets.exceptions.ConnectionClosedError'>code = 1006 (connection closed abnormally [internal]), no reason"}

Это код Python EXTRACTOR2:

import json
import asyncio
import websockets



from analyzer import Analyzer



analyzer = Analyzer()

print("Ready")

async def server(websocket, path):

    try:

        request_json = await websocket.recv()

        #await websocket.send(":)")

        request = json.loads(request_json)

        print("Peticion recibida")

        print(request_json)



        with open("log.txt","w+") as f:

            f.write(request_json)



        # Parsear request

        data = []

        for topic in request['data']:

            sentiment = {}

            sentiment['word'] = topic['word']

            sentiment['sentiment'] = analyzer.api_get_sentiment(topic['context'])

            data.append(sentiment)

            print("Analizado " + topic['word'])



        # Enviar respuesta

        response = {"ok": 1, "data": data}

        print(response)

        await websocket.send(json.dumps(response))



    except BaseException as exception:

        print("Exception found")

        response = {"ok": 0, "data": "[ANALYZER] " + str(type(exception)) + str(exception)}

        print(json.dumps(response))

        await websocket.send(json.dumps(response))

    finally:

        print("Finished")



start_server = websockets.serve(server, "0.0.0.0", 8082)



asyncio.get_event_loop().run_until_complete(start_server)

asyncio.get_event_loop().run_forever()

1 Ответ

2 голосов
/ 31 октября 2019

Я использовал следующий прокси-сервер Ballerina:

import ballerina/http;
import ballerina/log;


final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://localhost:9091/basic";
final string EXTRACTOR2 = "ws://localhost:8082";



@http:WebSocketServiceConfig {
    path: "/api/ws"
}

service RequestService on new http:Listener(9092) {

  resource function onOpen(http:WebSocketCaller caller) {

        http:WebSocketClient wsClientEp = new(
            EXTRACTOR,
            {callbackService: ClientService1,
            readyOnConnect: false,
            maxFrameSize: 2147483648
        });
        http:WebSocketClient wsClientEp2 = new(
            EXTRACTOR2,
            {callbackService: ClientService2,
            readyOnConnect: false,
            maxFrameSize: 2147483648
        });

        caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);
        wsClientEp.setAttribute(ASSOCIATED_CONNECTION, wsClientEp2);
        wsClientEp2.setAttribute(ASSOCIATED_CONNECTION, caller);

        var err = wsClientEp->ready();
        if (err is http:WebSocketError) {
            log:printError("Error calling ready on client 1", err);
        }

        err = wsClientEp2->ready();
        if (err is http:WebSocketError) {
            log:printError("Error calling ready on client 2", err);
        }
    }

    resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {

        http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
        var err = clientEp->pushText(text, finalFrame);
        if (err is http:WebSocketError) {
            log:printError("Error occurred when sending text message", err);
        }
    }


    resource function onError(http:WebSocketCaller caller, error err) {

       http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
       var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
       if (e is http:WebSocketError) {
           log:printError("Error occurred when closing the connection", e);
       }
       _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
       log:printError("Unexpected error hence closing the connection", err);
   }

   resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {

       http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
       var err = clientEp->close(statusCode = statusCode, reason = reason);
       if (err is http:WebSocketError) {
           log:printError("Error occurred when closing the connection", err);
       }
       _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
   }
}

service ClientService1 = @http:WebSocketServiceConfig {} service {

    resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
        http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
        var err = clientEp2->pushText(text, finalFrame);
        if (err is http:WebSocketError) {
            log:printError("Error occurred when sending text message", err);
        }
    }

    resource function onError(http:WebSocketClient caller, error err) {
        http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
        var e = clientEp2->close(statusCode = 1011, reason = "Unexpected condition");
        if (e is http:WebSocketError) {
            log:printError("Error occurred when closing the connection", err = e);
        }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
        log:printError("Unexpected error hense closing the connection", err);
    }

    resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
        http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
        var err = clientEp2->close(statusCode = statusCode, reason = reason);
            if (err is http:WebSocketError) {
                log:printError("Error occurred when closing the connection", err);
            }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
    }
};

service ClientService2 = @http:WebSocketServiceConfig {} service {

    resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var err = serverEp->pushText(text, finalFrame);
        if (err is http:WebSocketError) {
            log:printError("Error occurred when sending text message", err);
        }
    }

    resource function onError(http:WebSocketClient caller, error err) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
        if (e is http:WebSocketError) {
            log:printError("Error occurred when closing the connection", err = e);
        }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
        log:printError("Unexpected error hense closing the connection", err);
    }

    resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var err = serverEp->close(statusCode = statusCode, reason = reason);
            if (err is http:WebSocketError) {
                log:printError("Error occurred when closing the connection", err);
            }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
    }
};

function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
    http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
    return wsClient;
}

function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
    http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
    return wsEndpoint;
}

function getAssociatedClientEndpointFromClient(http:WebSocketClient ep) returns (http:WebSocketClient) {
    http:WebSocketClient wsEndpoint = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
    return wsEndpoint;
}

и этот python-сервер, который немного изменен для моего тестирования.

import json
import asyncio
import websockets

print("Ready")

async def server(websocket, path):

    try:

        request_json = await websocket.recv()

        print("Peticion recibida")

        data = ["hello"]

        response = {"ok": 1, "data": data}

        print(response)

        await websocket.send(json.dumps(response))

    except BaseException as exception:

        print("Exception found")

        response = {"ok": 0, "data": "[ANALYZER] " + str(type(exception)) + str(exception)}

        print(json.dumps(response))

        await websocket.send(json.dumps(response))

    finally:

        print("Finished")

start_server = websockets.serve(server, "0.0.0.0", 8082)

asyncio.get_event_loop().run_until_complete(start_server)

asyncio.get_event_loop().run_forever()

Это прекрасно работает для меня.

...