gRPC исключение "вызов закрыт" при вызове onNext на сервере - PullRequest
0 голосов
/ 21 февраля 2019

К сожалению, это происходит спорадически в производстве, но я не могу надежно воспроизвести.

Сервер gRPC раздает небольшие, но частые обновления нескольким клиентам.Каждый клиент делает несколько запросов одного и того же вызова с разными параметрами.Это постоянная потоковая передача данных, никогда не происходит onComplete с сервера.

При вызове onNext я получаю следующую ошибку:

Feb 20, 2019 10:13:03 AM io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall$$Lambda$47/1901113624@2b4ca8e3
java.lang.IllegalStateException: call is closed
at com.google.common.base.Preconditions.checkState(Preconditions.java:174)
at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:124)
at io.grpc.ForwardingServerCall.sendMessage(ForwardingServerCall.java:32)
at concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall.access$1001(UncaughtExceptionServerInterceptor.java:142)
at concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall.lambda$sendMessage$0(UncaughtExceptionServerInterceptor.java:158)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86)
at concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall.sendMessage(UncaughtExceptionServerInterceptor.java:158)
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341)
at myPackage$helper.lambda$calculateChangesAndNotifyObservers$1(myCode.java:229)

Список наблюдателей сохраняется на основе запросов gRPC на подписку на данные.Несколько клиентов могут запрашивать одинаковые обновления, поэтому каждый входящий клиент добавляется в список для соответствующих данных.

Если они отменяются, они удаляются из списка с помощью:

if( responseObserver instanceof ServerCallStreamObserver<?> )
{
    ((ServerCallStreamObserver<?>) responseObserver).setOnCancelHandler( () ->
    {
        synchronized( _lastSnapshot )
        {
            _observers.remove( responseObserver );
        }                       
    } );
}

Разумно ли ловить исключение, выбрасываемое из onNext, и просто удалять клиента из списка?Или есть лучший способ обнаружения?Или я должен решить основную проблему?

...