К сожалению, это происходит спорадически в производстве, но я не могу надежно воспроизвести.
Сервер gRPC раздает небольшие, но частые обновления нескольким клиентам.Каждый клиент делает несколько запросов одного и того же вызова с разными параметрами.Это постоянная потоковая передача данных, никогда не происходит onComplete с сервера.
При вызове onNext я получаю следующую ошибку:
Feb 20, 2019 10:13:03 AM io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall$$Lambda$47/1901113624@2b4ca8e3
java.lang.IllegalStateException: call is closed
at com.google.common.base.Preconditions.checkState(Preconditions.java:174)
at io.grpc.internal.ServerCallImpl.sendMessage(ServerCallImpl.java:124)
at io.grpc.ForwardingServerCall.sendMessage(ForwardingServerCall.java:32)
at concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall.access$1001(UncaughtExceptionServerInterceptor.java:142)
at concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall.lambda$sendMessage$0(UncaughtExceptionServerInterceptor.java:158)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:456)
at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86)
at concord.grpc.UncaughtExceptionServerInterceptor$SerializingServerCall.sendMessage(UncaughtExceptionServerInterceptor.java:158)
at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:341)
at myPackage$helper.lambda$calculateChangesAndNotifyObservers$1(myCode.java:229)
Список наблюдателей сохраняется на основе запросов gRPC на подписку на данные.Несколько клиентов могут запрашивать одинаковые обновления, поэтому каждый входящий клиент добавляется в список для соответствующих данных.
Если они отменяются, они удаляются из списка с помощью:
if( responseObserver instanceof ServerCallStreamObserver<?> )
{
((ServerCallStreamObserver<?>) responseObserver).setOnCancelHandler( () ->
{
synchronized( _lastSnapshot )
{
_observers.remove( responseObserver );
}
} );
}
Разумно ли ловить исключение, выбрасываемое из onNext, и просто удалять клиента из списка?Или есть лучший способ обнаружения?Или я должен решить основную проблему?