PubSubInboundChannelAdapter прекращает получать сообщения после 4-го сообщения - PullRequest
1 голос
/ 30 октября 2019

Я создал упрощенный пример, который воспроизводит мою реальную проблему.

Мой пример принимает от Google pub / sub, регистрирует его и отправляет подтверждение обратно на Pub/Sub

Конфиг:

@Slf4j
@Configuration
public class MyConfig implements FlowSupport {

    private final AppProperties properties;

    public MyConfig(AppProperties properties) {
        this.properties = properties;
    }

    @Bean
    public JacksonFactory jacksonFactory() {
        return JacksonFactory.getDefaultInstance();
    }
    @Bean
    public MessageChannel bucketNotificationChannel() {
        return MessageChannels.direct("input-notification-channel").get();
    }

    @Bean
    public PubSubInboundChannelAdapter messageChannelAdapter(PubSubTemplate template) {
        var adapter = new PubSubInboundChannelAdapter(template, properties.getBucketTopicSubscription());
        adapter.setOutputChannel(bucketNotificationChannel());
        adapter.setErrorChannel(errorChannel());
        adapter.setAckMode(AckMode.MANUAL);
        adapter.setPayloadType(Notification.class);
        return adapter;
    }

    @Bean
    @Override
    public MessageChannel idempotentDiscardChannel() {
        return MessageChannels.direct("idempotent-discard-channel").get();
    }

    @Bean
    public MessageChannel errorChannel() {
        return MessageChannels.direct("global-error-channel").get();
    }

    @Bean
    @Override
    public ConcurrentMetadataStore idempotencyStore() {
        return new SimpleMetadataStore();
    }

    @Bean
    public IntegrationFlow bucketNotificationFlow(
            EmptyNotificationHandler handler,
            IntegrationFlow acknowledgementFlow
    ) {
        return flow -> flow.channel(bucketNotificationChannel())
                .handle(handler)
                .log(INFO, "Handler finished", m -> {
                    return "got" + m;
                }).gateway(acknowledgementFlow);
    }

    @Bean
    public IntegrationFlow acknowledgementFlow(PubSubAckHandler handler) {
        return flow -> flow.log(DEBUG, "acknowledgementFlow", m -> "Handling acknowledgement for message: " + m)
                .handle(handler);
    }
}

NotificationHandler:

@Component
@Slf4j
public class EmptyNotificationHandler implements GenericHandler<Notification> {
    private final ResourceLoader loader;

    public EmptyNotificationHandler(ResourceLoader loader) {
        this.loader = loader;
    }

    @Override
    public Resource handle(Notification payload, MessageHeaders headers) {
        try {
            return new Resource() {
                @Override
                public boolean exists() {
                    return false;
                }    
                ...
            };

        } catch (Exception e) {
            log.error("Error occurred:", e);
            return null;
        }
    }
}

AckHandler :

@Component
public class MyPubSubAckHandler implements MessageHandler {

    private final ConcurrentMetadataStore idempotencyStore;


    public MyPubSubAckHandler(ConcurrentMetadataStore idempotencyStore, MeterRegistry meterRegistry) {
        this.idempotencyStore = idempotencyStore;
    }

    @Override
    public void handleMessage(@NonNull Message<?> message) throws MessagingException {
        Message<?> targetMessage = MessageUtils.unwrap(message);
        var pubSubMessage = getOriginalMessage(targetMessage);
        if (pubSubMessage == null) {
            removeFromIdempotentStore(targetMessage);               
            return;
        }
        var generation = targetMessage.getHeaders().get(OBJECT_GENERATION_HEADER);
        if (message instanceof ErrorMessage || message.getPayload() instanceof Throwable) {
            pubSubMessage.nack().addCallback(
                    v -> {
                        removeFromIdempotentStore(targetMessage);
                        log.error("Error message was nacked - {}", generation);
                    },
                    e -> {
                        removeFromIdempotentStore(targetMessage);
                        log.error("Failed to nack message {}", generation, e);
                    }
            );
        } else {
            pubSubMessage.ack().addCallback(
                    v -> {
                        removeFromIdempotentStore(targetMessage);
                        log.info("Acknowledged message - {}", generation);
                    },
                    e -> {
                        removeFromIdempotentStore(targetMessage);
                        log.error("Failed to acknowledge message - {}", generation, e);
                    }
            );
        }
    }

    @SuppressWarnings({"RedundantSuppression", "unchecked"}) //IDEMPOTENCY_HEADER has Set<String> underneath
    private void removeFromIdempotentStore(Message<?> targetMessage) {
        Optional.ofNullable(targetMessage.getHeaders().get(IDEMPOTENCY_HEADER, Set.class)).stream()
                .flatMap(Collection::stream)
                .forEach(key -> idempotencyStore.remove(String.valueOf(key)));
    }
}

Когда яотправить первое сообщение все в порядке - я вижу сообщение в журнале и подтверждение отправлено на pubsub. Кроме того, я вижу, что количество непрочитанных сообщений на странице подписки gcp равно 0.

Но после нескольких успешных сообщений мое приложение просто перестает получать сообщения. Я потратил много времени на отладку и смог выяснить следующее:

Несколько потоков зависает на линии: org.springframework.messaging.core.GenericMessagingTemplate.TeilitaryReplyChannel # receive (long): 314

this.replyLatch.await();

ThreadDump:

"gcp-pubsub-subscriber1@7980" prio=5 tid=0x1e nid=NA waiting
  java.lang.Thread.State: WAITING
      at jdk.internal.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
      at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232)
      at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:314)
      at org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel.receive(GenericMessagingTemplate.java:306)
      at org.springframework.messaging.core.GenericMessagingTemplate.doReceive(GenericMessagingTemplate.java:207)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:240)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:47)
      at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:46)
      at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
      at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:503)
      at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:474)
      at org.springframework.integration.gateway.GatewayProxyFactoryBean.sendOrSendAndReceive(GatewayProxyFactoryBean.java:573)
      at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:508)
      at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:478)
      at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:468)
      at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
      at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
      at com.sun.proxy.$Proxy110.exchange(Unknown Source:-1)
      at org.springframework.integration.gateway.GatewayMessageHandler.handleRequestMessage(GatewayMessageHandler.java:88)
      at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
      at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
      at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
      at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
      at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
      at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
      at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
      at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:444)
      at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:318)
      at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:266)
      at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:229)
      at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:133)
      at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
      at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
      at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
      at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
      at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
      at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
      at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
      at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter.consumeMessage(PubSubInboundChannelAdapter.java:148)
      at org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter$$Lambda$859.600858818.accept(Unknown Source:-1)
      at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:152)
      at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate$$Lambda$860.1495761010.receiveMessage(Unknown Source:-1)
      at com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:379)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
      at java.util.concurrent.FutureTask.run(FutureTask.java:-1)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.lang.Thread.run(Thread.java:834)

Я полагаю, что это причина, потому что внутри com.google.cloud.pubsub.v1.MessageDispatcher # processOutstandingMessage:

строка:

  executor.execute(deliverMessageTask);

выполняется, но deliveryMessageTask не выполняется threadExecutor.

На мой взгляд, это похоже на ошибку в библиотеке, но это может быть злоупотребление библиотекой. В любом случае я ищу любое решение / обходной путь, чтобы избежать этой проблемы.

Версии библиотеки:

Я использую:
spring-boot 2.2.0.RELEASE
springCloudVersion = "Greenwich.SR3"

com.google.cloud:google-cloud-pubsub:1.98.0

PS

Я знаю, что могу увеличить размер пула потоков, например:

spring:
  cloud:
    gcp:     
      pubsub:
        enabled: true
        subscriber:
          executor-threads: 100

Но я не думаю, что этохорошая идея.

1 Ответ

1 голос
/ 30 октября 2019

Ваша проблема здесь:

.gateway(acknowledgementFlow);

Это означает request-reply, и мы не можем догадаться, что ваш acknowledgementFlow - это one-way поток. Я вижу это по вашей реализации MyPubSubAckHandler, которая возвращает void для своей реализации handleMessage(). Таким образом, шлюз ожидает ответа, но реальный подпоток никогда не вернет никакого результата. Поэтому ожидание ответных цепочек застряло, и в конечном итоге ваше приложение завершится ошибкой.

Одним из решений является сделать GatewayEndpointSpec.replyTimeout() как 0. Таким образом, ваш подпоток void не будет блокировать основной поток для потенциального ответа.

Другой способ - просто не использовать gateway(), а только содержимое подпотока непосредственно в основном потоке. Это действительно не похоже на то, что вы ожидаете некоторого ответа, так что это должно сработать для вас:

  return flow -> flow.channel(bucketNotificationChannel())
            .handle(handler)
            .log(INFO, "Handler finished", m -> {
                return "got" + m;
            })
            .log(DEBUG, "acknowledgementFlow", m -> "Handling acknowledgement for message: " + m)
            .handle(pubSubAckHandler);
...