Google PubSub последние сообщения не обрабатываются - PullRequest
0 голосов
/ 28 ноября 2018

Я использовал пример подписчика из документации Google для Google PubSub, единственное изменение, которое я сделал, - это комментирование подтверждения сообщений.

Подписчик больше не добавляет сообщения в очередь, в то время как сообщения должны повторно отправляться в соответствии с интервалом, установленным в облачной консоли Google.

Почему это происходит или я что-то упустил?

public class SubscriberExample {

use the default project id
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();

static class MessageReceiverExample implements MessageReceiver {



    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        messages.offer(message);

        //consumer.ack();
    }
}
/** Receive messages over a subscription. */
public static void main(String[] args) throws Exception {
    // set subscriber id, eg. my-sub
    String subscriptionId = args[0];
    ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
            PROJECT_ID, subscriptionId);
    Subscriber subscriber = null;
    try {
        // create a subscriber bound to the asynchronous message receiver
        subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
        subscriber.startAsync().awaitRunning();
        // Continue to listen to messages
        while (true) {
            PubsubMessage message = messages.take();
            System.out.println("Message Id: " + message.getMessageId());
            System.out.println("Data: " + message.getData().toStringUtf8());
        }
    } finally {
       if (subscriber != null) {
            subscriber.stopAsync();
        }
    }
}
}

1 Ответ

0 голосов
/ 28 ноября 2018

Когда вы не подтверждаете сообщения, клиентская библиотека Java вызывает modifyAckDeadline для сообщения до тех пор, пока не пройдет maxAckExtensionPeriod .По умолчанию это значение составляет один час.Поэтому, если вы не подтвердите сообщение или не измените это значение, скорее всего, сообщение не будет доставлено в течение часа.Если вы хотите изменить максимальный период расширения подтверждения, установите его на компоновщике:

subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample())
    .setMaxAckExtensionPeriod(Duration.ofSeconds(60))
    .build();               

Стоит также отметить, что когда вы не получаете или не подтверждаете сообщения, тогда flow control может помешать доставке большего количества сообщений.По умолчанию клиентская библиотека Java допускает выдачу до 1000 сообщений, т. Е. В ожидании подтверждения ack или nack или в течение периода расширения max ack.

...