У нас есть система издательских и абонентских систем на базе GCP PubSub.Абонент обрабатывает одно сообщение довольно долго, около 1 минуты.Мы уже установили для подписчиков крайний срок подтверждения 600 секунд (10 минут) (максимальный), чтобы убедиться, что pubsub не начнет повторную доставку слишком рано, так как в основном у нас здесь длительная операция.
Я вижу этоповедение PubSub.В то время как код отправляет подтверждение, и монитор подтверждает, что запрос подтверждения PubSub был принят, а само подтверждение завершено со статусом успеха, общее количество неупакованных сообщений остается прежним.
Метрики на диаграммах показывают одинаковые значения для выравнивателя суммы, количества и среднего значения агрегации.На приведенном выше рисунке выравниватель имеет среднее значение, и никакие редукторы не включены.
Я использую библиотеку @ google-cloud / pubsub Node.js.Были опробованы разные версии (0.18.1, 0.22.2, 0.24.1), но я думаю, проблема не в них.
Следующий класс можно использовать для проверки.
TypeScript 3.1.1, Узел 8.xx - 10.xx
import { exponential, Backoff } from "backoff";
const pubsub = require("@google-cloud/pubsub");
export interface IMessageHandler {
handle (message): Promise<void>;
}
export class PubSubSyncListener {
private readonly client;
private listener: Backoff;
private runningOperations: Promise<unknown>[] = [];
constructor (
private readonly handler: IMessageHandler,
private readonly options: {
/**
* Maximal messages number to be processed simultaniosly.
* Listener will try to keep processing number as close to provided value
* as possible.
*/
maxMessages: number;
/**
* Formatted full subscrption name /projects/{projectName}/subscriptions/{subscriptionName}
*/
subscriptionName: string;
/**
* In milliseconds
*/
minimalListenTimeout?: number;
/**
* In milliseconds
*/
maximalListenTimeout?: number;
}
) {
this.client = new pubsub.v1.SubscriberClient();
this.options = Object.assign({
minimalListenTimeout: 300,
maximalListenTimeout: 30000
}, this.options);
}
public async listen () {
this.listener = exponential({
maxDelay: this.options.maximalListenTimeout,
initialDelay: this.options.minimalListenTimeout
});
this.listener.on("ready", async () => {
if (this.runningOperations.length < this.options.maxMessages) {
const [response] = await this.client.pull({
subscription: this.options.subscriptionName,
maxMessages: this.options.maxMessages - this.runningOperations.length
});
for (const m of response.receivedMessages) {
this.startMessageProcessing(m);
}
this.listener.reset();
this.listener.backoff();
} else {
this.listener.backoff();
}
});
this.listener.backoff();
}
private startMessageProcessing (message) {
const index = this.runningOperations.length;
const removeFromRunning = () => {
this.runningOperations.splice(index, 1);
};
this.runningOperations.push(
this.handler.handle(this.getHandlerMessage(message))
.then(removeFromRunning, removeFromRunning)
);
}
private getHandlerMessage (message) {
message.message.ack = async () => {
const ackRequest = {
subscription: this.options.subscriptionName,
ackIds: [message.ackId]
};
await this.client.acknowledge(ackRequest);
};
return message.message;
}
public async stop () {
this.listener.reset();
this.listener = null;
await Promise.all(
this.runningOperations
);
}
}
Это в основном частичная реализация асинхронного извлечения сообщений и немедленного подтверждения.Поскольку одно из предложенных решений заключалось в использовании синхронного извлечения.
Я обнаружил аналогичную проблему в репозитории java, если я не ошибаюсь в симптомах проблемы.
https://github.com/googleapis/google-cloud-java/issues/3567
Последняя деталь здесь заключается в том, что подтверждение работает с небольшим количеством запросов.В случае, если я запускаю одно сообщение в pubsub, а затем сразу же обрабатываю его, количество недоставленных сообщений уменьшается (уменьшается до 0, так как раньше было только одно сообщение).
Сам вопрос - что происходит и почему число непрочитанных сообщенийне уменьшается, как следует, когда получено подтверждение?