Слушатель Google pubsub не получает все сообщения - PullRequest
1 голос
/ 03 августа 2020

Я использую Google Cloud Storage для хранения объектов с корзиной, связанной с topi c и идентификатором подписки. Поток таков, что приложение Java запрашивает ссылку (-ы) для загрузки и загружает объект (-ы), используя эти ссылки для загрузки. У меня также есть прослушиватель pubsub, реализованный в Java, который получает сообщение с уведомлением о загрузке и что-то делает при каждой успешной загрузке. Это фрагмент, который обрабатывает прослушивание событий.

public void eventListener() {
    MessageReceiver messageReceiver = (message, consumer) -> {
        final Map<String, Object> uploadMetaDataMap = getUploadDataMap(message);            
        LOGGER.info("Upload event detected => {} ", uploadMetaDataMap);
        // do something
        consumer.ack();
    };
    Subscriber subscriber = null;
    Subscriber finalSubscriber = subscriber;
    /* To ensure that any messages already being handled by receiveMessage run to completion */
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            finalSubscriber.stopAsync().awaitTerminated();
        }
    });
    try {
        subscriber = Subscriber.newBuilder(subscription, messageReceiver)
                .setCredentialsProvider(FixedCredentialsProvider.create(creds)).build();
        subscriber.addListener(new Subscriber.Listener() {
            @Override
            public void failed(Subscriber.State from, Throwable failure) {
                // Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down.
                LOGGER.error(String.valueOf(failure));
            }
        }, MoreExecutors.directExecutor());
        subscriber.startAsync().awaitRunning();
        subscriber.awaitTerminated();
    } finally {
        if (subscriber != null) {
            subscriber.stopAsync().awaitTerminated();
        }
    }
}

Я сохраняю объекты в этом формате => bucket/uuid/objectName.extension и при каждой успешной загрузке LOGGER.info("Upload event detected => {} ", uploadMetaDataMap); регистрирует такие сообщения

2020-08-03 16:12:14,686 [Gax-1] INFO  listener.AsynchronousPull - Upload event detected => {size=85, uuid=6dff9a20-3995-4f28-93e9-79e6c3cf613d, bucket=bucketName}

Проблема, с которой я столкнулся сейчас, заключается в том, что не все успешные события загрузки отправляют уведомление. Я вижу структуру папок, созданную в GCS с соответствующим объектом внутри, но уведомление, связанное с этой загрузкой, нигде не может быть найдено в журналах, напечатанных прослушивателем pubsub. Это меня уже давно беспокоит, и мне действительно нужна помощь с этим.

...