pubsub-emulator выдает ошибку, и издатель выдает сообщение «Общее время ожидания повторной попытки превышено до получения любого ответа», когда publi sh 50k сообщений - PullRequest
0 голосов
/ 07 мая 2020

Сведения о среде

  • ОС: uname -a => Darwin US_C02WG0GXHV2V 17.7.0 Darwin Kernel Version 17.7.0: Thu Jan 23 07:05:23 PST 2020; root:xnu-4570.71.69~1/RELEASE_X86_64 x86_64
  • Node.js версия: node -v => v10.16.2
  • npm версия: npm -v => 6.14.4
  • @google-cloud/pubsub версия: "@google-cloud/pubsub": "^1.7.3"

Описание

Когда я пытался опубликовать sh 50k сообщений, pubsub-emulator бесконечно выдает ошибку ниже:

Стек ошибок

[pubsub] May 06, 2020 2:47:20 PM io.grpc.netty.NettyServerHandler onStreamError
[pubsub] 警告: Stream Error
[pubsub] io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
[pubsub]        at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:149)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:356)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1000)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:956)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:512)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:518)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:599)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.processRstStreamWriteResult(Http2ConnectionHandler.java:872)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.access$1000(Http2ConnectionHandler.java:66)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:796)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:793)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:502)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:476)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:415)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:540)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:529)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:101)
[pubsub]        at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:703)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338)
[pubsub]        at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428)
[pubsub]        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:939)
[pubsub]        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360)
[pubsub]        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:906)
[pubsub]        at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1370)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.handler.logging.LoggingHandler.flush(LoggingHandler.java:265)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:978)
[pubsub]        at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:253)
[pubsub]        at io.grpc.netty.WriteQueue.flush(WriteQueue.java:118)
[pubsub]        at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32)
[pubsub]        at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44)
[pubsub]        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
[pubsub]        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
[pubsub]        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495)
[pubsub]        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
[pubsub]        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[pubsub]        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[pubsub]        at java.lang.Thread.run(Thread.java:748)
[pubsub] 
[pubsub] May 06, 2020 2:47:20 PM io.grpc.netty.NettyServerHandler onStreamError
[pubsub] 警告: Stream Error
[pubsub] io.netty.handler.codec.http2.Http2Exception$StreamException: Stream closed before write could take place
[pubsub]        at io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:149)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$FlowState.cancel(DefaultHttp2RemoteFlowController.java:481)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController$1.onStreamClosed(DefaultHttp2RemoteFlowController.java:105)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection.notifyClosed(DefaultHttp2Connection.java:356)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.removeFromActiveStreams(DefaultHttp2Connection.java:1000)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$ActiveStreams.deactivate(DefaultHttp2Connection.java:956)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:512)
[pubsub]        at io.netty.handler.codec.http2.DefaultHttp2Connection$DefaultStream.close(DefaultHttp2Connection.java:518)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.closeStream(Http2ConnectionHandler.java:599)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.processRstStreamWriteResult(Http2ConnectionHandler.java:872)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.access$1000(Http2ConnectionHandler.java:66)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:796)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler$3.operationComplete(Http2ConnectionHandler.java:793)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:502)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:476)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:415)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:540)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:529)
[pubsub]        at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:101)
[pubsub]        at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:703)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:258)
[pubsub]        at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:338)
[pubsub]        at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:428)
[pubsub]        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:939)
[pubsub]        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:360)
[pubsub]        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:906)
[pubsub]        at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1370)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.handler.logging.LoggingHandler.flush(LoggingHandler.java:265)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.handler.codec.http2.Http2ConnectionHandler.flush(Http2ConnectionHandler.java:201)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:739)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:731)
[pubsub]        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:717)
[pubsub]        at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:978)
[pubsub]        at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:253)
[pubsub]        at io.grpc.netty.WriteQueue.flush(WriteQueue.java:118)
[pubsub]        at io.grpc.netty.WriteQueue.access$000(WriteQueue.java:32)
[pubsub]        at io.grpc.netty.WriteQueue$1.run(WriteQueue.java:44)
[pubsub]        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
[pubsub]        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
[pubsub]        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:495)
[pubsub]        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
[pubsub]        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[pubsub]        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[pubsub]        at java.lang.Thread.run(Thread.java:748)
[pubsub] 

Мой издатель также выдает много таких же ошибок:

{ Error: Retry total timeout exceeded before any response was received
    at repeat (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:83:23)
    at Timeout.setTimeout (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:124:13)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10) code: 4 }
{ Error: Retry total timeout exceeded before any response was received
    at repeat (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:83:23)
    at Timeout.setTimeout (/Users/ldu020/workspace/xxx/xxx-master/workflow/node_modules/google-gax/src/normalCalls/retries.ts:124:13)
    at ontimeout (timers.js:436:11)
    at tryOnTimeout (timers.js:300:5)
    at listOnTimeout (timers.js:263:5)
    at Timer.processTimers (timers.js:223:10) code: 4 }

Мой подписчик выдает следующие ошибки:

[2020-05-06T06:29:08.932Z]Received message 46689:
Data: message payload 3998
Attributes: {}
[2020-05-06T06:29:08.932Z]Received message 46690:
Data: message payload 3999
Attributes: {}
ERROR: Error: Failed to "acknowledge" for 500 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "acknowledge" for 100 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "acknowledge" for 200 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 400 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 500 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 100 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to "modifyAckDeadline" for 200 message(s). Reason: 4 DEADLINE_EXCEEDED: Deadline exceeded
ERROR: Error: Failed to connect to channel. Reason: Failed to connect before the deadline

На самом деле я не знаю, что это ошибка, или я использую pubsub неправильно. Я обнаружил, что есть некоторые вопросы по StackOverflow и проблемы с GitHub:

Но не нашел решения.

Шаги по воспроизведению

Я сделал минимальный пример кода для его воспроизведения. Расскажите, пожалуйста, что происходит? Спасибо!

Репо: https://github.com/mrdulin/nodejs-gcp/tree/master/src/pubsub/pubsub-emulator

  1. Запустить pubsub-эмулятор
gcloud beta emulators pubsub start --project=$PROJECT_ID
Создать топи c:
npx ts-node ./publisher.ts create pubsub-emulator-t1
Создать подписку на топи c:
npx ts-node ./subscriber.ts create pubsub-emulator-t1 pubsub-emulator-t1-sub
Слушайте сообщения:
npx ts-node ./subscriber.ts receive pubsub-emulator-t1-sub
Publi sh 50k сообщений:
npx ts-node ./publisher.ts publish pubsub-emulator-t1

Дополнительная информация

Я также получил ошибку: Retry total timeout exceeded before any response was received в производственных средах. Так что это может не быть проблемой pubsub-emulator

@google/pubsub version: "@google-cloud/pubsub": "^1.6.0"

Вот почему я пытаюсь создать пример для его воспроизведения. Но не могу понять, что происходит. Единственный способ воспроизвести эту ошибку сейчас - опубликовать 50 тыс. Сообщений.

Тестовые примеры: https://gist.github.com/mrdulin/79f1689a9baaafaef90fcad42646bf6d

...