Как прокси на несколько серверов в WebSocket - PullRequest
2 голосов
/ 21 октября 2019

У меня есть запрограммированный в балерине прокси-сервер, который получает запрос от пользователя к веб-сокету и отправляет его через другой веб-сокет службе, которая его обрабатывает. В настоящее время, когда служба отправляет мне ответ, сценарий балерины возвращает этот ответ пользователю. Я хотел бы сделать это, как только я получу ответ от службы, он будет отправлен в другую службу для выполнения второй обработки, и когда эта служба ответит и вернет его пользователю.

У меня есть следующий код:

import ballerina/http;
import ballerina/log;


final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://192.168.10.248:8081";



@http:WebSocketServiceConfig {
    path: "/api/ws"
}

service RequestService on new http:Listener(9091) {

  resource function onOpen(http:WebSocketCaller caller) {

        http:WebSocketClient wsClientEp = new(
            EXTRACTOR,
            {callbackService: ClientService,
            readyOnConnect: false,
            maxFrameSize: 2147483648
        });

        wsClientEp.setAttribute(ASSOCIATED_CONNECTION, caller);
        caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);

        var err = wsClientEp->ready();
        if (err is http:WebSocketError) {
            log:printError("Error calling ready on client", err);
        }
    }

    resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {

        http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
        var err = clientEp->pushText(text, finalFrame);
        if (err is http:WebSocketError) {
            log:printError("Error occurred when sending text message", err);
        }
    }


    resource function onError(http:WebSocketCaller caller, error err) {

       http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
       var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
       if (e is http:WebSocketError) {
           log:printError("Error occurred when closing the connection", e);
       }
       _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
       log:printError("Unexpected error hence closing the connection", err);
   }

   resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {

       http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
       var err = clientEp->close(statusCode = statusCode, reason = reason);
       if (err is http:WebSocketError) {
           log:printError("Error occurred when closing the connection", err);
       }
       _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
   }
}

service ClientService = @http:WebSocketServiceConfig {} service {

    resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var err = serverEp->pushText(text, finalFrame);
        if (err is http:WebSocketError) {
            log:printError("Error occurred when sending text message", err);
        }
    }

    resource function onError(http:WebSocketClient caller, error err) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
        if (e is http:WebSocketError) {
            log:printError("Error occurred when closing the connection", err = e);
        }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
        log:printError("Unexpected error hense closing the connection", err);
    }

    resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var err = serverEp->close(statusCode = statusCode, reason = reason);
            if (err is http:WebSocketError) {
                log:printError("Error occurred when closing the connection", err);
            }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
    }
};

function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
    http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
    return wsClient;
}

function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
    http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
    return wsEndpoint;
}

Я сделал следующий конвейер: Клиент -> Балерина прокси -> Сервис1 -> Балерина прокси -> Клиент

Я хочу: Клиент -> Балерина прокси-> Сервис1 -> Балерина прокси -> Сервис2 -> Балерина прокси -> Клиент

1 Ответ

3 голосов
/ 21 октября 2019

Мы можем связать связанные соединения

caller has wsClientEp
wsClientEp has wsClientEp2
wsClientEp2 has caller

Я написал грубый код, я думаю, что следующий код должен работать. Обязательно измените имена переменных:

import ballerina/http;
import ballerina/log;


final string ASSOCIATED_CONNECTION = "EXTRACTOR CONNECTION";
final string EXTRACTOR = "ws://localhost:9090/basic";
final string EXTRACTOR2 = "ws://localhost:9091/basic";



@http:WebSocketServiceConfig {
    path: "/api/ws"
}

service RequestService on new http:Listener(9092) {

  resource function onOpen(http:WebSocketCaller caller) {

        http:WebSocketClient wsClientEp = new(
            EXTRACTOR,
            {callbackService: ClientService1,
            readyOnConnect: false,
            maxFrameSize: 2147483648
        });
        http:WebSocketClient wsClientEp2 = new(
            EXTRACTOR2,
            {callbackService: ClientService2,
            readyOnConnect: false,
            maxFrameSize: 2147483648
        });

        caller.setAttribute(ASSOCIATED_CONNECTION, wsClientEp);
        wsClientEp.setAttribute(ASSOCIATED_CONNECTION, wsClientEp2);
        wsClientEp2.setAttribute(ASSOCIATED_CONNECTION, caller);

        var err = wsClientEp->ready();
        if (err is http:WebSocketError) {
            log:printError("Error calling ready on client 1", err);
        }

        err = wsClientEp2->ready();
        if (err is http:WebSocketError) {
            log:printError("Error calling ready on client 2", err);
        }
    }

    resource function onText(http:WebSocketCaller caller, string text, boolean finalFrame) {

        http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
        var err = clientEp->pushText(text, finalFrame);
        if (err is http:WebSocketError) {
            log:printError("Error occurred when sending text message", err);
        }
    }


    resource function onError(http:WebSocketCaller caller, error err) {

       http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
       var e = clientEp->close(statusCode = 1011, reason = "Unexpected condition");
       if (e is http:WebSocketError) {
           log:printError("Error occurred when closing the connection", e);
       }
       _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
       log:printError("Unexpected error hence closing the connection", err);
   }

   resource function onClose(http:WebSocketCaller caller, int statusCode, string reason) {

       http:WebSocketClient clientEp = getAssociatedClientEndpoint(caller);
       var err = clientEp->close(statusCode = statusCode, reason = reason);
       if (err is http:WebSocketError) {
           log:printError("Error occurred when closing the connection", err);
       }
       _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
   }
}

service ClientService1 = @http:WebSocketServiceConfig {} service {

    resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
        http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
        var err = clientEp2->pushText(text, finalFrame);
        if (err is http:WebSocketError) {
            log:printError("Error occurred when sending text message", err);
        }
    }

    resource function onError(http:WebSocketClient caller, error err) {
        http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
        var e = clientEp2->close(statusCode = 1011, reason = "Unexpected condition");
        if (e is http:WebSocketError) {
            log:printError("Error occurred when closing the connection", err = e);
        }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
        log:printError("Unexpected error hense closing the connection", err);
    }

    resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
        http:WebSocketClient clientEp2 = getAssociatedClientEndpointFromClient(caller);
        var err = clientEp2->close(statusCode = statusCode, reason = reason);
            if (err is http:WebSocketError) {
                log:printError("Error occurred when closing the connection", err);
            }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
    }
};

service ClientService2 = @http:WebSocketServiceConfig {} service {

    resource function onText(http:WebSocketClient caller, string text, boolean finalFrame) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var err = serverEp->pushText(text, finalFrame);
        if (err is http:WebSocketError) {
            log:printError("Error occurred when sending text message", err);
        }
    }

    resource function onError(http:WebSocketClient caller, error err) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var e = serverEp->close(statusCode = 1011, reason = "Unexpected condition");
        if (e is http:WebSocketError) {
            log:printError("Error occurred when closing the connection", err = e);
        }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
        log:printError("Unexpected error hense closing the connection", err);
    }

    resource function onClose(http:WebSocketClient caller, int statusCode, string reason) {
        http:WebSocketCaller serverEp = getAssociatedServerEndpoint(caller);
        var err = serverEp->close(statusCode = statusCode, reason = reason);
            if (err is http:WebSocketError) {
                log:printError("Error occurred when closing the connection", err);
            }
        _ = caller.removeAttribute(ASSOCIATED_CONNECTION);
    }
};

function getAssociatedClientEndpoint(http:WebSocketCaller ep) returns (http:WebSocketClient) {
    http:WebSocketClient wsClient = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
    return wsClient;
}

function getAssociatedServerEndpoint(http:WebSocketClient ep) returns (http:WebSocketCaller) {
    http:WebSocketCaller wsEndpoint = <http:WebSocketCaller>ep.getAttribute(ASSOCIATED_CONNECTION);
    return wsEndpoint;
}

function getAssociatedClientEndpointFromClient(http:WebSocketClient ep) returns (http:WebSocketClient) {
    http:WebSocketClient wsEndpoint = <http:WebSocketClient>ep.getAttribute(ASSOCIATED_CONNECTION);
    return wsEndpoint;
}

Обратите внимание, что я немного изменил URL для тестирования.

...