Google Cloud PubSub не подтверждает сообщения - PullRequest
0 голосов
/ 08 февраля 2019

У нас есть система издательских и абонентских систем на базе GCP PubSub.Абонент обрабатывает одно сообщение довольно долго, около 1 минуты.Мы уже установили для подписчиков крайний срок подтверждения 600 секунд (10 минут) (максимальный), чтобы убедиться, что pubsub не начнет повторную доставку слишком рано, так как в основном у нас здесь длительная операция.

Я вижу этоповедение PubSub.В то время как код отправляет подтверждение, и монитор подтверждает, что запрос подтверждения PubSub был принят, а само подтверждение завершено со статусом успеха, общее количество неупакованных сообщений остается прежним.

enter image description here

Метрики на диаграммах показывают одинаковые значения для выравнивателя суммы, количества и среднего значения агрегации.На приведенном выше рисунке выравниватель имеет среднее значение, и никакие редукторы не включены.

Я использую библиотеку @ google-cloud / pubsub Node.js.Были опробованы разные версии (0.18.1, 0.22.2, 0.24.1), но я думаю, проблема не в них.

Следующий класс можно использовать для проверки.

TypeScript 3.1.1, Узел 8.xx - 10.xx

import { exponential, Backoff } from "backoff";

const pubsub = require("@google-cloud/pubsub");

export interface IMessageHandler {
    handle (message): Promise<void>;
}

export class PubSubSyncListener {
    private readonly client;

    private listener: Backoff;

    private runningOperations: Promise<unknown>[] = [];

    constructor (
        private readonly handler: IMessageHandler,
        private readonly options: {
            /**
             * Maximal messages number to be processed simultaniosly.
             * Listener will try to keep processing number as close to provided value
             * as possible.
             */
            maxMessages: number;
            /**
             * Formatted full subscrption name /projects/{projectName}/subscriptions/{subscriptionName}
             */
            subscriptionName: string;
            /**
             * In milliseconds
             */
            minimalListenTimeout?: number;
            /**
             * In milliseconds
             */
            maximalListenTimeout?: number;
        }
    ) {
        this.client = new pubsub.v1.SubscriberClient();

        this.options = Object.assign({
            minimalListenTimeout: 300,
            maximalListenTimeout: 30000
        }, this.options);
    }

    public async listen () {
        this.listener = exponential({
            maxDelay: this.options.maximalListenTimeout,
            initialDelay: this.options.minimalListenTimeout
        });

        this.listener.on("ready", async () => {
            if (this.runningOperations.length < this.options.maxMessages) {
                const [response] = await this.client.pull({
                    subscription: this.options.subscriptionName,
                    maxMessages: this.options.maxMessages - this.runningOperations.length
                });

                for (const m of response.receivedMessages) {
                    this.startMessageProcessing(m);
                }
                this.listener.reset();
                this.listener.backoff();
            } else {
                this.listener.backoff();
            }
        });

        this.listener.backoff();
    }

    private startMessageProcessing (message) {
        const index = this.runningOperations.length;

        const removeFromRunning = () => {
            this.runningOperations.splice(index, 1);
        };

        this.runningOperations.push(
            this.handler.handle(this.getHandlerMessage(message))
                .then(removeFromRunning, removeFromRunning)
        );
    }

    private getHandlerMessage (message) {
        message.message.ack = async () => {
            const ackRequest = {
                subscription: this.options.subscriptionName,
                ackIds: [message.ackId]
            };

            await this.client.acknowledge(ackRequest);
        };

        return message.message;
    }

    public async stop () {
        this.listener.reset();
        this.listener = null;
        await Promise.all(
            this.runningOperations
        );
    }
}

Это в основном частичная реализация асинхронного извлечения сообщений и немедленного подтверждения.Поскольку одно из предложенных решений заключалось в использовании синхронного извлечения.

Я обнаружил аналогичную проблему в репозитории java, если я не ошибаюсь в симптомах проблемы.

https://github.com/googleapis/google-cloud-java/issues/3567

Последняя деталь здесь заключается в том, что подтверждение работает с небольшим количеством запросов.В случае, если я запускаю одно сообщение в pubsub, а затем сразу же обрабатываю его, количество недоставленных сообщений уменьшается (уменьшается до 0, так как раньше было только одно сообщение).

Сам вопрос - что происходит и почему число непрочитанных сообщенийне уменьшается, как следует, когда получено подтверждение?

1 Ответ

0 голосов
/ 11 февраля 2019

Чтобы цитировать из документации , метрика подписки / num_undelivered_messages, которую вы используете, представляет собой «Количество неподтвержденных сообщений (или сообщений о невыполненных заданиях) в подписке. Выборка производится каждые 60 секунд. Послевыборка, данные не отображаются в течение 120 секунд."

Не следует ожидать, что этот показатель уменьшится сразу после получения сообщения.Кроме того, это звучит так, как будто вы пытаетесь использовать pubsub для одного случая доставки, пытаясь убедиться, что сообщение не будет доставлено снова.Cloud Pub / Sub не предоставляет эту семантику.Это обеспечивает хотя бы раз семантику.Другими словами, даже если вы получили значение, подтвердили его, получили ответ ack и увидели снижение показателя с 1 до 0, все равно можно и правильно для того же работника или другого получить точную копию этого сообщения,Хотя на практике это маловероятно, вам следует сосредоточиться на построении системы, допускающей дублирование, вместо того, чтобы пытаться обеспечить успешное подтверждение подтверждения, чтобы ваше сообщение не доставлялось.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...