Как я могу разорвать соединение akka websocket со стороны сервера? - PullRequest
0 голосов
/ 25 октября 2018

Мне кажется, что я не могу разорвать соединение через веб-сокет со стороны сервера соединения, используя рекомендованный подход вызова метода terminate () на ServerBinding.

Я включил приведенный ниже код, который устанавливает соединение, завершает его ссторона сервера, ожидает завершения, затем отправляет сообщения от клиента.Все сообщения успешно обрабатываются сервером после завершения .

Как разорвать соединение через веб-сокет со стороны сервера?

public class AkkaWebSocketServerTerminateTest {

private static final Logger LOGGER = LoggerFactory.getLogger(AkkaWebSocketServerTerminateTest.class);

@Test
public void clientConnectsServer_ServerGracefullyTerminatesConnection() throws ExecutionException, InterruptedException, TimeoutException {
    ActorSystem system = ActorSystem.create();
    Materializer materializer = ActorMaterializer.create(system);
    Http http = Http.get(system);

    /* SERVER */
    Flow<Message, Message, CompletionStage<Done>> serverSideHandlerFlow = Flow
            .of(Message.class)
            .via(WebsocketLayer.messageToStringFlow())
            .map(s -> {
                LOGGER.debug("handling {}", s);
                return "handled " + s;
            })
            .map(s -> (Message) TextMessage.create(s))
            .alsoToMat(Sink.ignore(), Keep.right());

    CompletionStage<ServerBinding> serverBindingCompletionStage = http.bindAndHandleSync(
            httpRequest -> WebSocket.handleWebSocketRequestWith(httpRequest, serverSideHandlerFlow),
            ConnectHttp.toHost("localhost", 9999),
            materializer);

    ServerBinding serverBinding = serverBindingCompletionStage.toCompletableFuture().get(3, TimeUnit.SECONDS);// wait for binding

    /* CLIENT */
    Sink<Message, CompletionStage<Done>> sink
            = Flow.of(Message.class)
            .via(WebsocketLayer.messageToStringFlow())
            .toMat(Sink.foreach(s -> LOGGER.debug("client received message: '{}'", s)), Keep.right());

    CompletableFuture<SourceQueueWithComplete<String>> futureClientSideSourceQueue = new CompletableFuture<>();

    Source<Message, SourceQueueWithComplete<String>> source
            = Source.<String>queue(0, OverflowStrategy.backpressure())
            .alsoToMat(Sink.foreach(s -> LOGGER.debug("client sending '{}'", s)), Keep.left())
            .map(s -> (Message) TextMessage.create(s))
            .mapMaterializedValue(sourceQueue -> {
                futureClientSideSourceQueue.complete(sourceQueue);
                return sourceQueue;
            });
    Flow<Message, Message, CompletionStage<Done>> clientFlow = Flow.fromSinkAndSourceCoupledMat(sink, source, Keep.left());

    WebSocketRequest webSocketRequest
            = WebSocketRequest.create("ws://localhost:9999");
    Pair<CompletionStage<WebSocketUpgradeResponse>, CompletionStage<Done>> clientPair
            = http.singleWebSocketRequest(webSocketRequest,
                                          clientFlow,
                                          materializer);


    CompletionStage<WebSocketUpgradeResponse> clientSideUpgradeResponse = clientPair.first();

    CompletionStage<Done> clientSideConnected = clientSideUpgradeResponse.thenApply(upgrade -> {
        if (upgrade.response().status().equals(StatusCodes.SWITCHING_PROTOCOLS)) {
            return Done.getInstance();
        } else {
            throw new RuntimeException("Connection failed: " + upgrade.response().status());
        }
    });
    CompletionStage<Done> clientSideClosed = clientPair.second();

    clientSideConnected.thenAccept(done -> {
        LOGGER.debug("Client connected");
        LOGGER.debug("Terminating all connections with a 1 second hard deadline");
        CompletionStage<HttpTerminated> onceAllConnectionsTerminated
                = serverBinding.terminate(Duration.ofSeconds(1));
        serverBinding.whenTerminated().thenAccept(terminated -> {
            LOGGER.debug("whenTerminated() -> terminated");
        });
        onceAllConnectionsTerminated.toCompletableFuture()
                .thenAccept(terminated -> {
                    LOGGER.debug("All connections terminated.");
                    try {
                        LOGGER.debug("Waiting 5 seconds before sending messages from the client to the terminated server.");
                        Thread.sleep(5000); // wait 5 seconds
                        SourceQueueWithComplete<String> queue = futureClientSideSourceQueue.get();
                        queue.offer("message 1");
                        queue.offer("message 2");
                        queue.offer("message 3");
                        queue.offer("message 4");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                });
    });

    try {
        LOGGER.debug("Waiting 15 seconds for client connection to close.");
        clientSideClosed
                .thenAccept(done -> LOGGER.debug("Client connection closed"))
                .toCompletableFuture()
                .get(15, TimeUnit.SECONDS); // wait for connection to close
    } catch (InterruptedException e) {
        LOGGER.error("Interrupted waiting for client connection to close", e);
    } catch (TimeoutException e) {
        LOGGER.error("Timeout waiting for client connection to close", e);
    }
}
}

Я получаю следующеевывод:

[DEBUG] [10/25/2018 12:14:18.763] [main] [EventStream(akka://default)] logger log1-Logging$DefaultLogger started
[DEBUG] [10/25/2018 12:14:18.780] [main] [EventStream(akka://default)] Default Loggers started
[DEBUG] [10/25/2018 12:14:19.297] [main] [AkkaSSLConfig(akka://default)] Initializing AkkaSSLConfig extension...
[DEBUG] [10/25/2018 12:14:19.306] [main] [AkkaSSLConfig(akka://default)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@31c7528f
[DEBUG] [10/25/2018 12:14:20.343] [default-akka.actor.default-dispatcher-6] 
[akka://default/system/IO-TCP/selectors/$a/0] Successfully bound to /127.0.0.1:9999
2018-10-25 12:14:20 DEBUG AkkaWebSocketServerTerminateTest:130 - Waiting 15 seconds for client connection to close.
[DEBUG] [10/25/2018 12:14:20.479] [default-akka.actor.default-dispatcher-4] 
[akka://default/system/IO-TCP/selectors/$a/1] Resolving localhost before connecting
[DEBUG] [10/25/2018 12:14:20.496] [default-akka.actor.default-dispatcher-6] 
[akka://default/system/IO-DNS] Resolution request for localhost from Actor[akka://default/system/IO-TCP/selectors/$a/1#1366663132]
[DEBUG] [10/25/2018 12:14:20.536] [default-akka.actor.default-dispatcher-6] 
[akka://default/system/IO-TCP/selectors/$a/1] Attempting connection to [localhost/127.0.0.1:9999]
[DEBUG] [10/25/2018 12:14:20.537] [default-akka.actor.default-dispatcher-4] 
[akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [10/25/2018 12:14:20.538] [default-akka.actor.default-dispatcher-6] 
[akka://default/system/IO-TCP/selectors/$a/1] Connection established to [localhost:9999]
2018-10-25 12:14:20 DEBUG AkkaWebSocketServerTerminateTest:104 - Client connected
2018-10-25 12:14:20 DEBUG AkkaWebSocketServerTerminateTest:105 - Terminating all connections with a 1 second hard deadline
[DEBUG] [10/25/2018 12:14:20.642] [default-akka.actor.default-dispatcher-13] 
[akka://default/system/IO-TCP/selectors/$a/0] Unbinding endpoint /127.0.0.1:9999
[DEBUG] [10/25/2018 12:14:20.643] [default-akka.actor.default-dispatcher-13] 
[akka://default/system/IO-TCP/selectors/$a/0] Unbound endpoint /127.0.0.1:9999, stopping listener
2018-10-25 12:14:20 DEBUG AkkaWebSocketServerTerminateTest:108 - whenTerminated() -> terminated
2018-10-25 12:14:20 DEBUG AkkaWebSocketServerTerminateTest:112 - All connections terminated.
2018-10-25 12:14:20 DEBUG AkkaWebSocketServerTerminateTest:114 - Waiting 5 seconds before sending messages from the client to the terminated server.
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:76 - client sending 'message 1'
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:76 - client sending 'message 2'
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:76 - client sending 'message 3'
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:76 - client sending 'message 4'
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:53 - handling message 1
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:53 - handling message 2
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:70 - client received message: 'handled message 1'
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:53 - handling message 3
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:53 - handling message 4
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:70 - client received message: 'handled message 2'
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:70 - client received message: 'handled message 3'
2018-10-25 12:14:25 DEBUG AkkaWebSocketServerTerminateTest:70 - client received message: 'handled message 4'
2018-10-25 12:14:35 ERROR AkkaWebSocketServerTerminateTest:138 - Timeout waiting for client connection to close

1 Ответ

0 голосов
/ 26 октября 2018

Я думаю, что явное завершение невозможно, но вы можете установить akka.http.server.idle-timeout в соответствии с примечанием из их документации

Inactive WebSocket connections will be dropped according to the idle-timeout settings. In case you need to keep inactive connections alive, you can either tweak your idle-timeout or inject ‘keep-alive’ messages regularly.

...