Проблема с подключением к GCP Java Pubsub Java Spring Cloud - PullRequest
0 голосов
/ 07 сентября 2018

У меня есть приложение Spring Boot, работающее в Google Kubernetes Engine, которое подключается к pubsub. Недавно мы начали видеть эту проблему, что приложение перестает получать сообщения из подписки через некоторое время. Если мы перезапустим приложение, оно подключится нормально и начнет собирать сообщения.

Я включил ведение журнала в DEBUG, и он показывает периодическое отключение, и попытки повторного подключения успешно предпринимаются, и он продолжает получать сообщения после этого, но в какой-то момент он просто молча прекращает прием сообщений. В моей локальной сети я заново создал сценарий, запустив службу, она получает сообщения, затем я отключил сеть, и после повторного подключения сети вышеприведенная логика keepAlive / retry по какой-то причине не срабатывает. В одном сценарии это произошло через пару часов, и все было хорошо.

Таким образом, вопрос в том, почему логика повторения не срабатывает при потере соединения (в этом тестируемом случае отключено и повторно подключено сетевое соединение).

Когда я смогу присоединиться, я опубликую тот же вопрос в группе обсуждения Google: https://groups.google.com/forum/#!searchin/cloud-pubsub-discuss

Это может быть что-то простое, например, какая-то конфигурация, но я пока не могу ее найти.

Моя настройка для воссоздания этого была следующей:

Базовое приложение Spring Boot после этого урока: https://spring.io/guides/gs/messaging-gcp-pubsub/

Использование Spring Boot 2.0.4.RELEASE, spring-cloud-gcp-starter-pubsub: 1.0.0.RELEASE и spring-интеграция-ядро: 5.0.7.RELEASE.

У меня есть несколько подписок, так как это наш вариант использования.

Ошибка повторного подключения, которая успешно подключается, приведена ниже (но эта ошибка не вызывается, когда сеть была отключена и повторно подключена вручную):

DEBUG 59005 --- [pool-2-thread-1] c.g.c.p.v.StreamingSubscriberConnection: поток закрыт с повторяющимся исключением; переподключится

io.grpc.StatusRuntimeException: НЕДОСТУПЕН: службе не удалось выполнить ваш запрос. Пожалуйста, попробуйте еще раз. [Код = 8a75] в io.grpc.Status.asRuntimeException (Status.java:526) ~ [grpc-core-1.13.1.jar: 1.13.1] на io.grpc.stub.ClientCalls $ StreamObserverToCallListenerAdapter.onClose (ClientCalls.java:420) [grpc-stub-1.13.1.jar: 1.13.1] в io.grpc.PartialForwardingClientCallListener.onClose (PartialForwardingClientCallListener.java:39) [grpc-core-1.13.1.jar: 1.13.1] на io.grpc.ForwardingClientCallListener.onClose (ForwardingClientCallListener.java:23) [grpc-core-1.13.1.jar: 1.13.1] в io.grpc.ForwardingClientCallListener $ SimpleForwardingClientCallListener.onClose (ForwardingClientCallListener.java:40) [grpc-core-1.13.1.jar: 1.13.1] в io.grpc.internal.CensusStatsModule $ StatsClientInterceptor $ 1 $ 1.onClose (CensusStatsModule.java:684) [grpc-core-1.13.1.jar: 1.13.1] в io.grpc.PartialForwardingClientCallListener.onClose (PartialForwardingClientCallListener.java:39) [grpc-core-1.13.1.jar: 1.13.1] на io.grpc.ForwardingClientCallListener.onClose (ForwardingClientCallListener.java:23) [grpc-core-1.13.1.jar: 1.13.1] в io.grpc.ForwardingClientCallListener $ SimpleForwardingClientCallListener.onClose (ForwardingClientCallListener.java:40) [grpc-core-1.13.1.jar: 1.13.1] в io.grpc.internal.CensusTracingModule $ TracingClientInterceptor $ 1 $ 1.onClose (CensusTracingModule.java:403) [grpc-core-1.13.1.jar: 1.13.1] на io.grpc.internal.ClientCallImpl.closeObserver (ClientCallImpl.java:459) [grpc-core-1.13.1.jar: 1.13.1] на io.grpc.internal.ClientCallImpl.access $ 300 (ClientCallImpl.java:63) [grpc-core-1.13.1.jar: 1.13.1] в io.grpc.internal.ClientCallImpl $ ClientStreamListenerImpl.close (ClientCallImpl.java:546) [grpc-core-1.13.1.jar: 1.13.1] на io.grpc.internal.ClientCallImpl $ ClientStreamListenerImpl.access $ 600 (ClientCallImpl.java:467) [grpc-core-1.13.1.jar: 1.13.1] в io.grpc.internal.ClientCallImpl $ ClientStreamListenerImpl $ 1StreamClosed.runInContext (ClientCallImpl.java:584) [grpc-core-1.13.1.jar: 1.13.1]в io.grpc.internal.ContextRunnable.run (ContextRunnable.java:37) [grpc-core-1.13.1.jar: 1.13.1] на io.grpc.internal.SerializingExecutor.run (SerializingExecutor.java:123) [grpc-core-1.13.1.jar: 1.13.1] в java.util.concurrent.Executors $ RunnableAdapter.call (Executors.java:511) [na: 1.8.0_161] at java.util.concurrent.FutureTask.run (FutureTask.java:266) [na: 1.8.0_161] в java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.access $ 201 (ScheduledThreadPoolExecutor.java:180) [na: 1.8.0_161] в java.util.concurrent.ScheduledThreadPoolExecutor $ ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [na: 1.8.0_161] в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149) [na: 1.8.0_161] в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) [na: 1.8.0_161] at java.lang.Thread.run (Thread.java:748) [na: 1.8.0_161]

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.AckMode;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import org.springframework.cloud.gcp.pubsub.support.GcpPubSubHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import com.google.cloud.pubsub.v1.AckReplyConsumer;

@SpringBootApplication
public class PubsubTestApplication {

  public static void main(String[] args) {
    SpringApplication.run(PubsubTestApplication.class, args);
  }

  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapter(@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,
        "subscription-1");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);

    return adapter;
  }


  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapterA(@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,
        "subscription-2");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);

    return adapter;
  }

  @Bean
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }

  @Bean
  @ServiceActivator(inputChannel = "pubsubInputChannel")
  public MessageHandler messageReceiver() {
    return message -> {
      System.out.println("Message arrived! Payload: " + new String((byte[])message.getPayload()));
      AckReplyConsumer consumer =
          (AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
      consumer.ack();
    };
  }
}
...