Google pubsub async Java API Чистое завершение работы - PullRequest
0 голосов
/ 07 ноября 2018

Я не могу заставить асинхронный клиент Java pubsub Google корректно завершить работу. После вызова Subscriber.stopAsync () я получаю исключения вроде этого

    14:30:07.600 [grpc-default-worker-ELG-1-2] WARN  io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise - An exception was thrown by io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$4.operationComplete()
          java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@724c721d rejected from java.util.concurrent.ScheduledThreadPoolExecutor@36bdb610[Shutting down, pool size = 1, active threads = 1, queued tasks = 2, completed tasks = 19]
          at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) ~[?:1.8.0_144]
          at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) ~[?:1.8.0_144]
          at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) ~[?:1.8.0_144]
          at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) ~[?:1.8.0_144]
          at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) ~[?:1.8.0_144]
          at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.onReady(ClientCallImpl.java:611) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.internal.ForwardingClientStreamListener.onReady(ForwardingClientStreamListener.java:49) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.internal.AbstractStream$TransportState.notifyIfReady(AbstractStream.java:298) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.internal.AbstractStream$TransportState.onStreamAllocated(AbstractStream.java:237) ~[grpc-core-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.NettyClientStream$TransportState.setHttp2Stream(NettyClientStream.java:249) ~[grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$4.operationComplete(NettyClientHandler.java:521) ~[grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler$4.operationComplete(NettyClientHandler.java:509) ~[grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.tryPromise(Http2CodecUtil.java:378) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.trySuccess(Http2CodecUtil.java:344) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2CodecUtil$SimpleChannelPromiseAggregator.trySuccess(Http2CodecUtil.java:256) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:52) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.DelegatingChannelPromiseNotifier.operationComplete(DelegatingChannelPromiseNotifier.java:31) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:103) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:696) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:409) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1396) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.forceFlush(SslHandler.java:1776) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:775) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.flush(SslHandler.java:752) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.gracefulClose(NettyClientHandler.java:631) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.NettyClientHandler.write(NettyClientHandler.java:300) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1061) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.AbstractChannel.write(AbstractChannel.java:295) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$AbstractQueuedCommand.run(WriteQueue.java:174) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.flush(WriteQueue.java:112) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:464) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [grpc-netty-shaded-1.15.0.jar:1.15.0]
          at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

Я думаю, что это косвенно вызвано тем, что потребители пытаются подтвердить сообщение, которое они обработали.

Я ожидаю, что после вызова stopAsync () с сервера не будет извлечено больше сообщений, но буферизованные на клиенте сообщения будут доставлены моему обратному вызову, и я смогу получить или проверить эти сообщения и любые другие. Я нахожусь в процессе обработки, но я не могу заставить эту работу.

Я не вижу других методов в подписке, которые я мог бы вызвать для постепенного завершения работы, я что-то упустил?

Очевидно, что в конечном итоге эти сообщения будут повторно доставлены, но я бы предпочел обработать сообщения в моем буфере перед выключением, и я бы предпочел избегать "обычных" исключений в журналах.

1 Ответ

0 голосов
/ 07 ноября 2018

Способ, которым работает stopAsync(), заключается в том, что он запускает «последовательность» завершения работы, но сразу возвращается. Потенциально оставляю какую-то работу за сценой. попробуйте вызвать subscriber.stopAsync().awaitTerminated(), чтобы программа ожидала, пока служба не перейдет в состояние «Завершено».

...