Подпишитесь на Google Pub / Sub Topi c на X секунд и остановитесь, если сообщения не получены - PullRequest
1 голос
/ 01 мая 2020

Я использую Google Pub / Sub Java SDK , чтобы подписаться на топи c. Я хочу сделать следующее:

  • Начать прослушивание topi c в течение X секунд (предположим, 25 секунд)
  • Если сообщение получено, прекратите прослушивание и обработать сообщение (это может занять несколько минут)
  • После обработки сообщения продолжить прослушивание topi c еще раз в течение 25 секунд
  • Если сообщение не получено в течение 25 секунд, то окончательно прекратить прослушивание

Кажется, я ничего не могу найти в документации и только. Может быть, это просто невозможно?

Вот как я запускаю подписчика:

// Create a subscriber bound to the asynchronous message receiver
subscriber = Subscriber.newBuilder(projectSubscriptionName, new PubSubRoeMessageReceiver()).build();

// Start subscriber
subscriber.startAsync().awaitRunning();

// Allow the subscriber to run indefinitely unless an unrecoverable error occurs.
subscriber.awaitTerminated();

А вот так выглядит мой приемник сообщений:

public class PubSubRoeMessageReceiver implements MessageReceiver {
    @Override
    public void receiveMessage(PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
        // Acknowledge message
        System.out.println("Acknowledge message");
        ackReplyConsumer.ack();

        // TODO: stop the subscriber

        // TODO: run task X

        // TODO: start the subscriber
    }
}

Есть идеи?

1 Ответ

3 голосов
/ 01 мая 2020

Использование Cloud Pub / Sub таким способом является анти-паттерном и может вызвать проблемы. Если вы сразу же подтвердите сообщение после его получения, но до его обработки, что вы будете делать, если по какой-то причине произойдет сбой подписчика? Pub / Sub не будет повторно доставлять сообщение и, следовательно, может никогда не обработать его потенциально.

Поэтому вы, вероятно, хотите дождаться подтверждения, пока сообщение не будет обработано. Но тогда вы не сможете закрыть подписчика, потому что тот факт, что сообщение является ожидающим, будет потерян, и, следовательно, истечет срок подтверждения, и сообщение будет доставлено.

Если вы хотите, чтобы клиент получал только одно сообщение за раз, вы можете использовать FlowControlSettings на клиенте. Если вы установите MaxOutstandingElementCount на 1, то на receiveMessage будет одновременно доставляться только одно сообщение:

subscriber = Subscriber.newBuilder(projectSubscriptionName, new PubSubRoeMessageReceiver())
    .setFlowControlSettings(FlowControlSettings.newBuilder()
        .setMaxOutstandingRequestBytes(10L * 1024L * 1024L) // 10MB messages allowed.
        .setMaxOutstandingElementCount(1L) // Only 1 outstanding message at a time.
        .build())
    .build();

Имейте в виду, что если у вас большой объем невыполненных небольших сообщений в то время, когда вы запускаете подписчика и намереваетесь запустить нескольких подписчиков, вы можете столкнуться с неэффективной балансировкой нагрузки, как объяснено в документации .

...