ES7 обещает и ожидает асинхронную функцию, которая зацикливается на фоне - PullRequest
0 голосов
/ 06 июля 2018

Это может быть особый случай:

Я хочу читать из очереди (AWS SQS), что делается путем вызова, который ждет сообщения в течение нескольких секунд, а затем разрешается - и снова и снова вызывается в цикле, пока вы хотите обработать это очередь (каждый раз проверяется флаг).

Это означает, что у меня есть функция consume, которая работает, пока приложение активно или очередь не помечена.

И у меня также есть функция subscribe, используемая для подписки на очередь - которая должна разрешиться, как только он узнает, что потребитель может подключиться к очереди. Хотя эта функция вызывает потребителя, который продолжает работать и не возвращается, пока очередь не будет помечена.

Это доставляет мне некоторые проблемы - есть ли у вас какие-либо советы о том, как решить эту проблему с помощью современных JS и асинхронных / ожидающих обещаний? Я имею в виду, что этот код выполняется в веб-приложении React, а не в node.js.

Я просто хочу, чтобы вызов await subscribe(QUEUE) (поступающий из графического интерфейса пользователя) разрешался, как только он будет уверен, что он может читать из этой очереди. Но если это невозможно, я хочу, чтобы он выдавал ошибку, которая распространяется на источник вызова подписки - что означает, что я должен await consume(QUEUE), верно?

Обновление: Был добавлен некоторый непроверенный черновой код (я не хочу тратить больше времени на то, чтобы заставить его работать, если я не делаю правильный подход) - я думал об отправке обратного вызова об успешном и неудачном выполнении функции-потребителя, чтобы он мог сообщить об успехе как только он получит первый действительный (но, возможно, пустой) ответ из очереди, что заставит его сохранить URL-адрес очереди в качестве подписки - и отменить подписку, если опрос очереди завершится неудачей.

Поскольку я настраиваю несколько потребителей очереди, они не должны ничего блокировать, а просто работают в фоновом режиме

let subscribedQueueURLs = []

async function consumeQueue(
  url: QueueURL,
  success: () => mixed,
  failure: (error: Error) => mixed
) {
  const sqs = new AWS.SQS()
  const params = {
    QueueUrl: url,
    WaitTimeSeconds: 20,
  }

  try {
    do {
      // eslint-disable-next-line no-await-in-loop
      const receivedData = await sqs.receiveMessage(params).promise()
      if (!subscribedQueueURLs.includes(url)) {
        success()
      }
      // eslint-disable-next-line no-restricted-syntax
      for (const message of receivedData.Messages) {
        console.log({ message })
        // eslint-disable-next-line no-await-in-loop
        eventHandler && (await eventHandler.message(message, url))

        const deleteParams = {
          QueueUrl: url,
          ReceiptHandle: message.ReceiptHandle,
        }
        // eslint-disable-next-line no-await-in-loop
        const deleteResult = await sqs.deleteMessage(deleteParams).promise()
        console.log({ deleteResult })
      }
    } while (subscribedQueueURLs.includes(url))
  } catch (error) {
    failure(error)
  }
}

export const subscribe = async (entityType: EntityType, entityId: EntityId) => {
  const url = generateQueueURL(entityType, entityId)
  consumeQueue(
    url,
    () => {
      subscribedQueueURLs.push(url)
      eventHandler && eventHandler.subscribe(url)
    },
    error => {
      console.error(error)
      unsubscribe(entityType, entityId)
    }
  )
}

1 Ответ

0 голосов
/ 07 июля 2018

В итоге я решил это так - возможно, не самое элегантное решение ...

let eventHandler: ?EventHandler
let awsOptions: ?AWSOptions
let subscribedQueueUrls = []
let sqs = null
let sns = null


export function setup(handler: EventHandler) {
  eventHandler = handler
}

export async function login(
  { awsKey, awsSecret, awsRegion }: AWSCredentials,
  autoReconnect: boolean
) {
  const credentials = new AWS.Credentials(awsKey, awsSecret)
  AWS.config.update({ region: awsRegion, credentials })
  sqs = new AWS.SQS({ apiVersion: '2012-11-05' })
  sns = new AWS.SNS({ apiVersion: '2010-03-31' })
  const sts = new AWS.STS({ apiVersion: '2011-06-15' })
  const { Account } = await sts.getCallerIdentity().promise()
  awsOptions = { accountId: Account, region: awsRegion }
  eventHandler && eventHandler.login({ awsRegion, awsKey, awsSecret }, autoReconnect)
}


async function handleQueueMessages(messages, queueUrl) {
  if (!sqs) {
    throw new Error(
      'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
    )
  }
  // eslint-disable-next-line no-restricted-syntax
  for (const message of messages) {
    if (!eventHandler) {
      return
    }
    // eslint-disable-next-line no-await-in-loop
    await eventHandler.message({
      content: message,
      queueUrl,
      timestamp: new Date().toISOString(),
    })

    const deleteParams = {
      QueueUrl: queueUrl,
      ReceiptHandle: message.ReceiptHandle,
    }
    // eslint-disable-next-line no-await-in-loop
    await sqs.deleteMessage(deleteParams).promise()
  }
}


export async function subscribe(queueUrl: QueueUrl) {
  if (!sqs) {
    throw new Error(
      'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
    )
  }

  const initialParams = {
    QueueUrl: queueUrl,
    WaitTimeSeconds: 0,
    MessageAttributeNames: ['All'],
    AttributeNames: ['All'],
  }

  const longPollParams = {
    ...initialParams,
    WaitTimeSeconds: 20,
  }

  // Attempt to consume the queue, and handle any pending messages.
  const firstResponse = await sqs.receiveMessage(initialParams).promise()
  if (!subscribedQueueUrls.includes(queueUrl)) {
    subscribedQueueUrls.push(queueUrl)
    eventHandler && eventHandler.subscribe(queueUrl)
  }
  handleQueueMessages(firstResponse.Messages, queueUrl)

  // Keep on polling the queue afterwards.
  setImmediate(async () => {
    if (!sqs) {
      throw new Error(
        'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
      )
    }
    try {
      do {
        // eslint-disable-next-line no-await-in-loop
        const received = await sqs.receiveMessage(longPollParams).promise()

        handleQueueMessages(received.Messages, queueUrl)
      } while (sqs && subscribedQueueUrls.includes(queueUrl))
    } catch (error) {
      eventHandler && eventHandler.disconnect()
      throw error
    }
  })
}
...