Сведения о среде
- ОС:
uname -a
=> Darwin US_C02WG0GXHV2V 17.7.0 Darwin Kernel Version 17.7.0: Thu Jan 23 07:05:23 PST 2020; root:xnu-4570.71.69~1/RELEASE_X86_64 x86_64
- Node.js версия:
node -v
=> v10.16.2
- npm версия:
npm -v
=> 6.14.4
@google-cloud/pubsub
версия: "@google-cloud/pubsub": "^1.7.3"
Описание
Когда я пытался опубликовать sh 50k сообщений, pubsub-emulator бесконечно выдает ошибку ниже:
Стек ошибок
[pubsub] May 06, 2020 2:47:20 PM io.grpc.netty.NettyServerHandler onStreamError
[pubsub] 警告: Stream Error
[pubsub] io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
[pubsub] at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:149)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:356)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1000)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:956)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:512)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:518)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:599)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.processRstStreamWriteResult(Http2ConnectionHandler.java:872)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.access$1000(Http2ConnectionHandler.java:66)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:796)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:793)
[pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:502)
[pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:476)
[pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:415)
[pubsub] at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:540)
[pubsub] at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:529)
[pubsub] at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:101)
[pubsub] at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
[pubsub] at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:703)
[pubsub] at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258)
[pubsub] at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338)
[pubsub] at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428)
[pubsub] at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:939)
[pubsub] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360)
[pubsub] at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:906)
[pubsub] at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1370)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub] at io.netty.handler.logging.LoggingHandler.flush(LoggingHandler.java:265)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub] at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub] at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:978)
[pubsub] at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:253)
[pubsub] at io.grpc.netty.WriteQueue.flush(WriteQueue.java:118)
[pubsub] at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32)
[pubsub] at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44)
[pubsub] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
[pubsub] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
[pubsub] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495)
[pubsub] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
[pubsub] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[pubsub] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[pubsub] at java.lang.Thread.run(Thread.java:748)
[pubsub]
[pubsub] May 06, 2020 2:47:20 PM io.grpc.netty.NettyServerHandler onStreamError
[pubsub] 警告: Stream Error
[pubsub] io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
[pubsub] at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:149)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:356)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1000)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:956)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:512)
[pubsub] at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:518)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:599)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.processRstStreamWriteResult(Http2ConnectionHandler.java:872)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.access$1000(Http2ConnectionHandler.java:66)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:796)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:793)
[pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:502)
[pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:476)
[pubsub] at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:415)
[pubsub] at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:540)
[pubsub] at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:529)
[pubsub] at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:101)
[pubsub] at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
[pubsub] at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:703)
[pubsub] at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258)
[pubsub] at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338)
[pubsub] at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428)
[pubsub] at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:939)
[pubsub] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360)
[pubsub] at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:906)
[pubsub] at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1370)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub] at io.netty.handler.logging.LoggingHandler.flush(LoggingHandler.java:265)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub] at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub] at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub] at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:978)
[pubsub] at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:253)
[pubsub] at io.grpc.netty.WriteQueue.flush(WriteQueue.java:118)
[pubsub] at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32)
[pubsub] at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44)
[pubsub] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
[pubsub] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
[pubsub] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495)
[pubsub] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
[pubsub] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[pubsub] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[pubsub] at java.lang.Thread.run(Thread.java:748)
[pubsub]
Мой издатель также выдает много таких же ошибок:
{ Error: Retry total timeout exceeded before any response was received
at repeat (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:83:23)
at Timeout.setTimeout (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:124:13)
at ontimeout (timers.js:436:11)
at tryOnTimeout (timers.js:300:5)
at listOnTimeout (timers.js:263:5)
at Timer.processTimers (timers.js:223:10) code: 4 }
{ Error: Retry total timeout exceeded before any response was received
at repeat (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:83:23)
at Timeout.setTimeout (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:124:13)
at ontimeout (timers.js:436:11)
at tryOnTimeout (timers.js:300:5)
at listOnTimeout (timers.js:263:5)
at Timer.processTimers (timers.js:223:10) code: 4 }
Мой подписчик выдает следующие ошибки:
[2020-05-06T06:29:08.932Z]Received message 46689:
Data: message payload 3998
Attributes: {}
[2020-05-06T06:29:08.932Z]Received message 46690:
Data: message payload 3999
Attributes: {}
ERROR: Error: Failed to "acknowledge" for 500 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "acknowledge" for 100 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "acknowledge" for 200 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 400 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 500 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 100 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 200 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to connect to channel. Reason: Failed to connect before the deadline
На самом деле я не знаю, что это ошибка, или я использую pubsub неправильно. Я обнаружил, что есть некоторые вопросы по StackOverflow и проблемы с GitHub:
Но не нашел решения.
Шаги по воспроизведению
Я сделал минимальный пример кода для его воспроизведения. Расскажите, пожалуйста, что происходит? Спасибо!
Репо: https://github.com/mrdulin/nodejs-gcp/tree/master/src/pubsub/pubsub-emulator
- Запустить pubsub-эмулятор
gcloud beta emulators pubsub start --project=$PROJECT_ID
Создать топи c:
npx ts-node ./publisher.ts create pubsub-emulator-t1
Создать подписку на топи c:
npx ts-node ./subscriber.ts create pubsub-emulator-t1 pubsub-emulator-t1-sub
Слушайте сообщения:
npx ts-node ./subscriber.ts receive pubsub-emulator-t1-sub
Publi sh 50k сообщений:
npx ts-node ./publisher.ts publish pubsub-emulator-t1
Дополнительная информация
Я также получил ошибку: Retry total timeout exceeded before any response was received
в производственных средах. Так что это может не быть проблемой pubsub-emulator
@google/pubsub
version: "@google-cloud/pubsub": "^1.6.0"
Вот почему я пытаюсь создать пример для его воспроизведения. Но не могу понять, что происходит. Единственный способ воспроизвести эту ошибку сейчас - опубликовать 50 тыс. Сообщений.
Тестовые примеры: https://gist.github.com/mrdulin/79f1689a9baaafaef90fcad42646bf6d