У меня есть приложение Spring Boot, работающее в Google Kubernetes Engine, которое подключается к pubsub. Недавно мы начали видеть эту проблему, что приложение перестает получать сообщения из подписки через некоторое время. Если мы перезапустим приложение, оно подключится нормально и начнет собирать сообщения.
Я включил ведение журнала в DEBUG, и он показывает периодическое отключение, и попытки повторного подключения успешно предпринимаются, и он продолжает получать сообщения после этого, но в какой-то момент он просто молча прекращает прием сообщений. В моей локальной сети я заново создал сценарий, запустив службу, она получает сообщения, затем я отключил сеть, и после повторного подключения сети вышеприведенная логика keepAlive / retry по какой-то причине не срабатывает. В одном сценарии это произошло через пару часов, и все было хорошо.
Таким образом, вопрос в том, почему логика повторения не срабатывает при потере соединения (в этом тестируемом случае отключено и повторно подключено сетевое соединение).
Когда я смогу присоединиться, я опубликую тот же вопрос в группе обсуждения Google: https://groups.google.com/forum/#!searchin/cloud-pubsub-discuss
Это может быть что-то простое, например, какая-то конфигурация, но я пока не могу ее найти.
Моя настройка для воссоздания этого была следующей:
Базовое приложение Spring Boot после этого урока: https://spring.io/guides/gs/messaging-gcp-pubsub/
Использование Spring Boot 2.0.4.RELEASE, spring-cloud-gcp-starter-pubsub: 1.0.0.RELEASE и spring-интеграция-ядро: 5.0.7.RELEASE.
У меня есть несколько подписок, так как это наш вариант использования.
Ошибка повторного подключения, которая успешно подключается, приведена ниже (но эта ошибка не вызывается, когда сеть была отключена и повторно подключена вручную):
DEBUG 59005 --- [pool-2-thread-1] c.g.c.p.v.StreamingSubscriberConnection: поток закрыт с повторяющимся исключением; переподключится
io.grpc.StatusRuntimeException: НЕДОСТУПЕН: службе не удалось выполнить ваш запрос. Пожалуйста, попробуйте еще раз. [Код = 8a75]
в io.grpc.Status.asRuntimeException (Status.java:526) ~ [grpc-core-1.13.1.jar: 1.13.1]
на io.grpc.stub.ClientCalls $ StreamObserverToCallListenerAdapter.onClose (ClientCalls.java:420) [grpc-stub-1.13.1.jar: 1.13.1]
в io.grpc.PartialForwardingClientCallListener.onClose (PartialForwardingClientCallListener.java:39) [grpc-core-1.13.1.jar: 1.13.1]
на io.grpc.ForwardingClientCallListener.onClose (ForwardingClientCallListener.java:23) [grpc-core-1.13.1.jar: 1.13.1]
в io.grpc.ForwardingClientCallListener $ SimpleForwardingClientCallListener.onClose (ForwardingClientCallListener.java:40) [grpc-core-1.13.1.jar: 1.13.1]
в io.grpc.internal.CensusStatsModule $ StatsClientInterceptor $ 1 $ 1.onClose (CensusStatsModule.java:684) [grpc-core-1.13.1.jar: 1.13.1]
в io.grpc.PartialForwardingClientCallListener.onClose (PartialForwardingClientCallListener.java:39) [grpc-core-1.13.1.jar: 1.13.1]
на io.grpc.ForwardingClientCallListener.onClose (ForwardingClientCallListener.java:23) [grpc-core-1.13.1.jar: 1.13.1]
в io.grpc.ForwardingClientCallListener $ SimpleForwardingClientCallListener.onClose (ForwardingClientCallListener.java:40) [grpc-core-1.13.1.jar: 1.13.1]
в io.grpc.internal.CensusTracingModule $ TracingClientInterceptor $ 1 $ 1.onClose (CensusTracingModule.java:403) [grpc-core-1.13.1.jar: 1.13.1]
на io.grpc.internal.ClientCallImpl.closeObserver (ClientCallImpl.java:459) [grpc-core-1.13.1.jar: 1.13.1]
на io.grpc.internal.ClientCallImpl.access $ 300 (ClientCallImpl.java:63) [grpc-core-1.13.1.jar: 1.13.1]
в io.grpc.internal.ClientCallImpl $ ClientStreamListenerImpl.close (ClientCallImpl.java:546) [grpc-core-1.13.1.jar: 1.13.1]
на io.grpc.internal.ClientCallImpl $ ClientStreamListenerImpl.access $ 600 (ClientCallImpl.java:467) [grpc-core-1.13.1.jar: 1.13.1]
в io.grpc.internal.ClientCallImpl $ ClientStreamListenerImpl $ 1StreamClosed.runInContext (ClientCallImpl.java:584) [grpc-core-1.13.1.jar: 1.13.1]в io.grpc.internal.ContextRunnable.run (ContextRunnable.java:37) [grpc-core-1.13.1.jar: 1.13.1]
на io.grpc.internal.SerializingExecutor.run (SerializingExecutor.java:123) [grpc-core-1.13.1.jar: 1.13.1]
в java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) [na: 1.8.0_161]
at java.util.concurrent.FutureTask.run (FutureTask.java:266) [na: 1.8.0_161]
в java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201 (ScheduledThreadPoolExecutor.java:180) [na: 1.8.0_161]
в java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [na: 1.8.0_161]
в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) [na: 1.8.0_161]
в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) [na: 1.8.0_161]
at java.lang.Thread.run (Thread.java:748) [na: 1.8.0_161]
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.AckMode;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import org.springframework.cloud.gcp.pubsub.support.GcpPubSubHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
@SpringBootApplication
public class PubsubTestApplication {
public static void main(String[] args) {
SpringApplication.run(PubsubTestApplication.class, args);
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,
"subscription-1");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapterA(@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,
"subscription-2");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@Bean
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -> {
System.out.println("Message arrived! Payload: " + new String((byte[])message.getPayload()));
AckReplyConsumer consumer =
(AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
consumer.ack();
};
}
}