Очередь подписки Rxjs - PullRequest
       7

Очередь подписки Rxjs

0 голосов
/ 09 сентября 2018

В моем угловом приложении есть подписка на firebase, которая срабатывает несколько раз. Как добиться того, чтобы задачи обрабатывались как очередь, чтобы я мог запускать каждую задачу синхронно один раз?

this.tasks.subscribe(async tasks => {
   for (const x of tasks) 
      await dolongtask(x); // has to be sync
      await removetask(x);
   });

Проблема в том, что событие subribe срабатывает, когда длинная задача еще обрабатывается.

Ответы [ 3 ]

0 голосов
/ 10 сентября 2018

ИМХО, я бы попытался использовать мощь rxjs, поскольку мы все равно используем его здесь и избегаем реализации пользовательской концепции организации очереди, как это предлагается в другом ответе (хотя вы, конечно, можете это сделать).

Если мы немного упростим данный случай, у нас просто есть некоторые наблюдаемые и мы хотим выполнить длительную процедуру для каждого излучения - в последовательности . rxjs позволяет сделать это с помощью оператора concatMap, по сути, из коробки:

$data.pipe(concatMap(item => processItem(item))).subscribe();

Это только предполагает, что processItem возвращает наблюдаемое. Поскольку вы использовали await, я предполагаю, что ваши функции в настоящий момент возвращают обещания. Их можно легко преобразовать в наблюдаемые с помощью from.

Единственная деталь, которую осталось рассмотреть из OP, - это то, что наблюдаемое фактически испускает массив элементов, и мы хотим выполнить операцию с каждым элементом каждого излучения. Чтобы сделать это, мы просто выравниваем наблюдаемое, используя mergeMap.


Давайте сложим все это вместе. Обратите внимание, что если вы уберете подготовку некоторых данных-заглушек и ведение журнала, фактическая реализация этого будет всего лишь двумя строками кода (с использованием mergeMap + concatMap).

const { from, interval } = rxjs;
const { mergeMap, concatMap, take, bufferCount, tap } = rxjs.operators;

// Stub for the long-running operation
function processTask(task) {
  console.log("Processing task: ", task);
  return new Promise(resolve => {
    setTimeout(() => {
      console.log("Finished task: ", task);
      resolve(task);
    }, 500 * Math.random() + 300);
  });
}

// Turn processTask into a function returning an observable
const processTask$ = item => from(processTask(item));

// Some stubbed data stream
const tasks$ = interval(250).pipe(
  take(9),
  bufferCount(3),
);

tasks$.pipe(
  tap(task => console.log("Received task: ", task)),
  // Flatten the tasks array since we want to work in sequence anyway
  mergeMap(tasks => tasks),
  // Process each task, but do so consecutively
  concatMap(task => processTask$(task)),
).subscribe(null, null, () => console.log("Done"));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
0 голосов
/ 11 сентября 2018

Оставляя в стороне ваш заголовок «Очередь подписки Rxjs», вы фактически можете исправить свой асинхронный / ожидающий код.

Проблема в том, что async / await не работает с циклами for, см. Этот вопрос Использование async / await с циклом forEach .

Например, вы можете заменить цикл for согласно ответу @ Bergi,

с Promise.all()

console.clear();
const { interval } = rxjs;
const { take, bufferCount } = rxjs.operators;

function processTask(task) {
  console.log(`Processing task ${task}`);
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(task);
    }, 500 * Math.random() + 300);
  });
}
function removeTask(task) {
  console.log(`Removing task ${task}`);
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(task);
    }, 50);
  });
}

const tasks$ = interval(250).pipe(
  take(10),
  bufferCount(3),
);

tasks$.subscribe(async tasks => {
  await Promise.all(
    tasks.map(async task => {
      await processTask(task); // has to be sync
      await removeTask(task);
      console.log(`Finished task ${task}`);
    })
  );
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>

Еще лучше, вы можете сформировать запрос, чтобы избежать использования цикла for,

с mergeMap()

console.clear();
const { interval } = rxjs;
const { mergeMap, take, bufferCount } = rxjs.operators;

function processTask(task) {
  console.log(`Processing task ${task}`);
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(task);
    }, 500 * Math.random() + 300);
  });
}
function removeTask(task) {
  console.log(`Removing task ${task}`);
  return new Promise(resolve => {
    setTimeout(() => {
      resolve(task);
    }, 50);
  });
}

const tasks$ = interval(250).pipe(
  take(10),
  bufferCount(3),
);

tasks$
.pipe(mergeMap(tasks => tasks))
.subscribe(
  async task => {
    await processTask(task); // has to be sync
    await removeTask(task);
    console.log(`Finished task ${task}`);
  }
);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
0 голосов
/ 10 сентября 2018

Я делаю пару предположений из кода, который вы дали,

  • другие приложения добавляют задачи в базу данных Firebase (асинхронно), и этот код реализует процессор задач.

  • Ваш запрос Firebase возвращает все необработанные задачи (в коллекции) и выдает полный список каждый раз, когда добавляется новая задача.

  • запрос отбросит задачу только после запуска removeTask()

Если это так, вам нужен механизм дедупликации перед процессором.

В целях иллюстрации я смоделировал запрос firebase с субъектом (переименовал его в tasksQuery $), а последовательность событий firebase смоделирована в нижней части скрипта. Надеюсь, это не слишком запутанно!

console.clear()
const { mergeMap, filter } = rxjs.operators;

// Simulate tasks query  
const tasksQuery$ = new rxjs.Subject();

// Simulate dolongtask and removetask (assume both return promises that can be awaited)
const dolongtask = (task) => {
  console.log( `Processing: ${task.id}`);
  return new Promise(resolve => {
    setTimeout(() => {
      console.log( `Processed: ${task.id}`);
      resolve('done')
    }, 1000);
  });
}
const removeTask = (task) => {
  console.log( `Removing: ${task.id}`);
  return new Promise(resolve => {
    setTimeout(() => {
      console.log( `Removed: ${task.id}`);
      resolve('done')
    }, 200);
  });
}

// Set up queue (this block could be a class in Typescript)
let tasks = [];
const queue$ = new rxjs.Subject();
const addToQueue = (task) => {
  tasks = [...tasks, task];
  queue$.next(task);
}
const removeFromQueue = () => tasks = tasks.slice(1);
const queueContains = (task) => tasks.map(t => t.id).includes(task.id)

// Dedupe and enqueue
tasksQuery$.pipe(
  mergeMap(tasks => tasks), // flatten the incoming task array 
  filter(task => task && !queueContains(task)) // check not in queue
).subscribe(task => addToQueue(task) );

//Process the queue
queue$.subscribe(async task => {
  await dolongtask(task);
  await removeTask(task); // Assume this sends 'delete' to firebase
  removeFromQueue();
});

// Run simulation
tasksQuery$.next([{id:1},{id:2}]);
// Add after delay to show repeated items in firebase
setTimeout(() => {
  tasksQuery$.next([{id:1},{id:2},{id:3}]); 
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.2/rxjs.umd.js"></script>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...