Жду правильного подключения к кролику в nodejs - PullRequest
0 голосов
/ 28 февраля 2019

Я пытаюсь написать свою простую оболочку Eventemitter для amqplib / callback_api.У меня проблемы с обработкой ситуации, когда кролик недоступен или отключен.У меня есть метод getConnect, который возвращает Promise, который разрешается, когда соединение установлено.Но если в соединении отказано, Promise явно отвергает.Как заставить этот метод выполнить переподключение, пока соединение не будет установлено

/**
     * Async method getConnect for connection
     * @returns {Promise<*>}
     */
    getConnect = async () => {
        return new Promise((resolve, reject) => {
            amqp.connect(this.config.url, async function(err, conn) {
                    if (err) {
                        reject(err);
                    }
                    resolve(conn);
            })
        })
    };

Весь код здесь https://github.com/kimonniez/rabbitEE

Возможно, я уже очень сонный, но я совершенно сбит с толку:) Заранее спасибо!

Ответы [ 2 ]

0 голосов
/ 28 февраля 2019

Оберните ваш Promise внутри Observable

Promise не для обработки логики "retry" .Если вы хотите это сделать, вам следует изучить Observables , используя библиотеку rxjs .Это позволит вам повторить попытку, используя произвольный интервал времени при обнаружении ошибок.

const { from, interval, of } = rxjs;
const { catchError, mergeMap, tap, skipWhile, take } = rxjs.operators;

const THRESHOLD = 3;
const RETRY_INTERVAL = 1000;

// Equivalent to 'amqp.connect'
const functionThatThrows = number =>
  number < THRESHOLD
    ? Promise.reject(new Error("ERROR"))
    : Promise.resolve("OK");

// Equivalent to `getConnect`
const getConnect = () =>
  interval(RETRY_INTERVAL)
    .pipe(
      mergeMap(x => from(functionThatThrows(x)).pipe(catchError(e => of(e)))),
      skipWhile(x => {
        const isError = x instanceof Error;
        if (isError) console.log('Found error. Retrying...');
        return isError;
      }),
      take(1)
    ).toPromise();

// Resolve only if the inner Promise is resolved
getConnect().then(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>

Объяснение

  1. Создать источник с interval из 1000.Это означает, что он будет повторять каждую секунду
  2. Позвоните вашему amqp.connect, что эквивалентно functionThatThrows в моем примере
  3. Перехватите ошибку с помощью оператора catchError и верните ее
  4. Пропустить, пока возвращаемый объект является ошибкой.Это позволит вам разрешить , только если ваше Promise было разрешено и не отклонено
  5. Получите первый разрешенный результат, используя take(1)
  6. Конвертируйте Наблюдаемый в обещании с помощью служебной функции toPromise
  7. Вызовите свою функцию и присоедините then, как вы делаете со стандартным Promise
0 голосов
/ 28 февраля 2019

Если вы просто хотите продолжать подключение до тех пор, пока соединение не будет установлено, вы можете заключить метод getConnect в новый метод keepConnect:

keepConnect = async () => {
   while (true) {
      try {
         let conn = await getConnect()
         return conn
      } catch (e) {}
  }
}

Но я думаю, что было бы лучшереализовать что-то вроде «попытаться подключиться n раз», изменив условие while.В общем случае решение «в то время как истина» не является чистым и может работать плохо, что может привести к замедлению цикла обработки событий (представьте, что метод connect всегда будет возвращать ошибку в течение нескольких миллисекунд).

Вы также можете реализовать систему прогрессивных задержек между попытками подключения, используя в качестве идеи оболочку keepConnect.

Если вы вместо этого хотите восстановить соединение при потере соединения, то это связаноКролику (что я не знаю) и его событиям.

...