Это может быть особый случай:
Я хочу читать из очереди (AWS SQS), что делается путем вызова, который ждет сообщения в течение нескольких секунд, а затем разрешается - и снова и снова вызывается в цикле, пока вы хотите обработать это очередь (каждый раз проверяется флаг).
Это означает, что у меня есть функция consume
, которая работает, пока приложение активно или очередь не помечена.
И у меня также есть функция subscribe
, используемая для подписки на очередь - которая должна разрешиться, как только он узнает, что потребитель может подключиться к очереди. Хотя эта функция вызывает потребителя, который продолжает работать и не возвращается, пока очередь не будет помечена.
Это доставляет мне некоторые проблемы - есть ли у вас какие-либо советы о том, как решить эту проблему с помощью современных JS и асинхронных / ожидающих обещаний? Я имею в виду, что этот код выполняется в веб-приложении React, а не в node.js.
Я просто хочу, чтобы вызов await subscribe(QUEUE)
(поступающий из графического интерфейса пользователя) разрешался, как только он будет уверен, что он может читать из этой очереди. Но если это невозможно, я хочу, чтобы он выдавал ошибку, которая распространяется на источник вызова подписки - что означает, что я должен await consume(QUEUE)
, верно?
Обновление:
Был добавлен некоторый непроверенный черновой код (я не хочу тратить больше времени на то, чтобы заставить его работать, если я не делаю правильный подход) - я думал об отправке обратного вызова об успешном и неудачном выполнении функции-потребителя, чтобы он мог сообщить об успехе как только он получит первый действительный (но, возможно, пустой) ответ из очереди, что заставит его сохранить URL-адрес очереди в качестве подписки - и отменить подписку, если опрос очереди завершится неудачей.
Поскольку я настраиваю несколько потребителей очереди, они не должны ничего блокировать, а просто работают в фоновом режиме
let subscribedQueueURLs = []
async function consumeQueue(
url: QueueURL,
success: () => mixed,
failure: (error: Error) => mixed
) {
const sqs = new AWS.SQS()
const params = {
QueueUrl: url,
WaitTimeSeconds: 20,
}
try {
do {
// eslint-disable-next-line no-await-in-loop
const receivedData = await sqs.receiveMessage(params).promise()
if (!subscribedQueueURLs.includes(url)) {
success()
}
// eslint-disable-next-line no-restricted-syntax
for (const message of receivedData.Messages) {
console.log({ message })
// eslint-disable-next-line no-await-in-loop
eventHandler && (await eventHandler.message(message, url))
const deleteParams = {
QueueUrl: url,
ReceiptHandle: message.ReceiptHandle,
}
// eslint-disable-next-line no-await-in-loop
const deleteResult = await sqs.deleteMessage(deleteParams).promise()
console.log({ deleteResult })
}
} while (subscribedQueueURLs.includes(url))
} catch (error) {
failure(error)
}
}
export const subscribe = async (entityType: EntityType, entityId: EntityId) => {
const url = generateQueueURL(entityType, entityId)
consumeQueue(
url,
() => {
subscribedQueueURLs.push(url)
eventHandler && eventHandler.subscribe(url)
},
error => {
console.error(error)
unsubscribe(entityType, entityId)
}
)
}