nodejs функциональное программирование с генераторами и обещаниями - PullRequest
0 голосов
/ 16 января 2019

Резюме

Является ли функциональное программирование в node.js достаточно общим? может ли он использоваться для решения реальной проблемы обработки небольших массивов записей в БД без загрузки всех записей в памяти с использованием toArray (таким образом, из памяти выходит). Вы можете прочитать эту критику для фона . Мы хотим продемонстрировать возможности Mux и DeMux и fork / tee / join таких библиотек node.js с асинхронными генераторами.

Контекст

Я подвергаю сомнению действительность и общность функционального программирования в node.js, используя любой инструмент функционального программирования (например, ramda , lodash и imlazy ) или даже на заказ.

Учитывая

Миллионы записей из курсора MongoDB, которые можно повторять, используя await cursor.next()

Возможно, вы захотите читать дальше about async generators и for-await-of.

Для поддельных данных можно использовать (на узле 10)

function sleep(ms) {
    return new Promise((resolve) => setTimeout(resolve, ms));
}
async function* getDocs(n) {
  for(let i=0;i<n;++i) {
     await sleep(1);
     yield {i: i, t: Date.now()};
  }
}
let docs=getDocs(1000000);

Требуются

Нам нужно

  • первый документ
  • последний документ
  • количество документов
  • разбивается на пакеты / пакеты из n документов и генерирует событие socket.io для этого объема

Убедитесь, что первый и последний документы включены в пакеты и не используются.

1050 * Ограничения * Миллионы записей не должны быть загружены в оперативную память, их следует перебирать и хранить не более одной партии. Требование может быть выполнено с использованием обычного кода nodejs, но может ли оно быть выполнено с использованием чего-то вроде applyspec, как в здесь . R.applySpec({ first: R.head(), last: R.last(), _: R.pipe( R.splitEvery(n), R.map( (i)=> {return "emit "+JSON.stringify(i);}) ) })(input)

Ответы [ 4 ]

0 голосов
/ 18 января 2019

вот два решения, использующие RxJs и scramjet .

вот решение RxJs

Хитрость заключалась в том, чтобы использовать share(), чтобы first() и last() не потребляли от итератора, forkJoin использовался для их объединения для генерации события done с этими значениями.

function ObservableFromAsyncGen(asyncGen) {
  return Rx.Observable.create(async function (observer) {
    for await (let i of asyncGen) {
      observer.next(i);
    }
    observer.complete();
  });  
}
async function main() {
  let o=ObservableFromAsyncGen(getDocs(100));
  let s = o.pipe(share());
  let f=s.pipe(first());
  let e=s.pipe(last());
  let b=s.pipe(bufferCount(13));
  let c=s.pipe(count());
  b.subscribe(log("bactch: "));
  Rx.forkJoin(c, f, e, b).subscribe(function(a){console.log(
    "emit done with count", a[0], "first", a[1], "last", a[2]);})
}

это scramjet , но это не чисто (функции имеют побочные эффекты)

async function main() {
  let docs = getDocs(100);
  let first, last, counter;
  let s0=Sj.DataStream
    .from(docs)
    .setOptions({ maxParallel: 1 })
    .peek(1, (item)=>first=item[0])
    .tee((s)=>{
        s.reduce((acc, item)=>acc+1, 0)
        .then((item)=>counter=item);
    })
    .tee((s)=>{
        s.reduce((acc, item)=>item)
        .then((item)=>last=item);
    })
    .batch(13)
    .map((batch)=>console.log("emit batch"+JSON.stringify(batch));
  await s0.run();
  console.log("emit done "+JSON.stringify({first: first, last:last, counter:counter}));
}

Я буду работать с @ michał-kapracki, чтобы разработать его чистую версию.

0 голосов
/ 16 января 2019

Я думаю, что, возможно, я разработал для вас ответ некоторое время назад, и он называется scramjet. Он легкий (нет тысяч зависимостей в node_modules), он прост в использовании и делает ваш код очень простым для понимания и чтения.

Давайте начнем с вашего дела:

DataStream
    .from(getDocs(10000))
    .use(stream => {
        let counter = 0;

        const items = new DataStream();
        const out = new DataStream();

        stream
            .peek(1, async ([first]) => out.whenWrote(first))
            .batch(100)
            .reduce(async (acc, result) => {
                await items.whenWrote(result);

                return result[result.length - 1];
            }, null)
            .then((last) => out.whenWrote(last))
            .then(() => items.end());

        items
            .setOptions({ maxParallel: 1 })
            .do(arr => counter += arr.length)
            .each(batch => writeDataToSocketIo(batch))
            .run()
            .then(() => (out.end(counter)))
        ;

        return out;
    })
    .toArray()
    .then(([first, last, count]) => ({ first, count, last }))
    .then(console.log)
;

Так что я не совсем согласен, что javascript FRP является антипаттерном, и я не думаю, что у меня есть единственный ответ на этот вопрос, но при разработке первых коммитов я обнаружил, что синтаксис стрелки ES6 и async / await записаны в виде Цепная мода делает код легко понятным.

Вот еще один пример кода scramjet из OpenAQ конкретно этой строки в процессе их извлечения :

return DataStream.fromArray(Object.values(sources))
  // flatten the sources
  .flatten()
  // set parallel limits
  .setOptions({maxParallel: maxParallelAdapters})
  // filter sources - if env is set then choose only matching source,
  //   otherwise filter out inactive sources.
  // * inactive sources will be run if called by name in env.
  .use(chooseSourcesBasedOnEnv, env, runningSources)
  // mark sources as started
  .do(markSourceAs('started', runningSources))
  // get measurements object from given source
  // all error handling should happen inside this call
  .use(fetchCorrectedMeasurementsFromSourceStream, env)
  // perform streamed save to DB and S3 on each source.
  .use(streamMeasurementsToDBAndStorage, env)
  // mark sources as finished
  .do(markSourceAs('finished', runningSources))
  // convert to measurement report format for storage
  .use(prepareCompleteResultsMessage, fetchReport, env)
  // aggregate to Array
  .toArray()
  // save fetch log to DB and send a webhook if necessary.
  .then(
    reportAndRecordFetch(fetchReport, sources, env, apiURL, webhookKey)
  );

Он описывает все, что происходит с каждым источником данных. Итак, вот мое предложение для допроса. :)

0 голосов
/ 18 января 2019

Чтобы показать, как это можно смоделировать с помощью vanilla JS, мы можем представить идею сворачивания поверх асинхронного генератора, который создает вещи, которые можно комбинировать вместе.

const foldAsyncGen = (of, concat, empty) => (step, fin) => async asyncGen => {
  let acc = empty
  for await (const x of asyncGen) {
    acc = await step(concat(acc, of(x)))
  }
  return await fin(acc)
}

Здесь аргументы разбиты на три части:

  • (of, concat, empty) ожидает, что функция создаст «комбинируемую» вещь, функцию, которая объединит две «комбинируемые» вещи и пустой / начальный экземпляр «комбинируемой» вещи
  • (step, fin) ожидает функцию, которая будет брать «комбинируемую» вещь на каждом шаге и производить Promise «комбинируемой» вещи, которая будет использоваться для следующего шага, и функцию, которая получит окончательную «комбинируемую» вещь вещь после того, как генератор исчерпал себя и выдает Promise конечного результата
  • async asyncGen - асинхронный генератор для обработки

В FP идея «комбинируемой» вещи известна как Monoid , которая определяет некоторые законы, детализирующие ожидаемое поведение объединения двух из них.

Затем мы можем создать моноид, который будет использоваться для прохождения первого, последнего и пакета значений при переходе через генератор.

const Accum = (first, last, batch) => ({
  first,
  last,
  batch,
})

Accum.empty = Accum(null, null, []) // an initial instance of `Accum`

Accum.of = x => Accum(x, x, [x])    // an `Accum` instance of a single value

Accum.concat = (a, b) =>            // how to combine two `Accum` instances together
  Accum(a.first == null ? b.first : a.first, b.last, a.batch.concat(b.batch))

Чтобы отразить идею очистки накопившихся партий, мы можем создать другую функцию, которая принимает функцию onFlush, которая будет выполнять какое-то действие в возвращаемом Promise с очищаемыми значениями и размером n того, когда очистить партию.

Accum.flush = onFlush => n => acc =>
  acc.batch.length < n ? Promise.resolve(acc)
                       : onFlush(acc.batch.slice(0, n))
                           .then(_ => Accum(acc.first, acc.last, acc.batch.slice(n)))

Теперь мы также можем определить, как мы можем сложить экземпляры Accum.

Accum.foldAsyncGen = foldAsyncGen(Accum.of, Accum.concat, Accum.empty)

Определив вышеуказанные утилиты, мы теперь можем использовать их для моделирования вашей конкретной проблемы.

const emit = batch => // This is an analog of where you would emit your batches
  new Promise((resolve) => resolve(console.log(batch)))

const flushEmit = Accum.flush(emit)

// flush and emit every 10 items, and also the remaining batch when finished
const fold = Accum.foldAsyncGen(flushEmit(10), flushEmit(0))

И наконец, запустите ваш пример.

fold(getDocs(100))
  .then(({ first, last })=> console.log('done', first, last))
0 голосов
/ 16 января 2019

Я не уверен, что справедливо подразумевать, что функциональное программирование могло бы предложить какие-либо преимущества перед императивным программированием с точки зрения производительности при работе с огромным количеством данных.

Я думаю, вам нужно добавить еще один инструмент в ваш инструментарий, и это может быть RxJS .

RxJS - это библиотека для составления асинхронных и событийных программ с использованием наблюдаемых последовательностей.

Если вы не знакомы с RxJS или реактивным программированием в целом, мои примеры определенно будут выглядеть странно, но я думаю, что было бы хорошим вложением средств для ознакомления с этими понятиями

В вашем случае наблюдаемой последовательностью является ваш экземпляр MongoDB, который со временем генерирует записи.

Я собираюсь подделать вашу БД:

var db = range(1, 5);

Функция range - это функция RxJS, которая будет выдавать значение в указанном диапазоне.

db.subscribe(n => {
  console.log(`record ${n}`);
});

//=> record 1
//=> record 2
//=> record 3
//=> record 4
//=> record 5

Теперь меня интересуют только первая и последняя запись.

Я могу создать наблюдаемую, которая будет излучать только первую запись, и создать другую, которая будет излучать только последнюю:

var db = range(1, 5);
var firstRecord = db.pipe(first());
var lastRecord = db.pipe(last());

merge(firstRecord, lastRecord).subscribe(n => {
  console.log(`record ${n}`);
});
//=> record 1
//=> record 5

Однако мне также нужно обрабатывать все записи партиями: (в этом примере я собираюсь создать партии по 10 записей в каждой)

var db = range(1, 100);
var batches = db.pipe(bufferCount(10))
var firstRecord = db.pipe(first());
var lastRecord = db.pipe(last());

merge(firstRecord, batches, lastRecord).subscribe(n => {
  console.log(`record ${n}`);
});

//=> record 1
//=> record 1,2,3,4,5,6,7,8,9,10
//=> record 11,12,13,14,15,16,17,18,19,20
//=> record 21,22,23,24,25,26,27,28,29,30
//=> record 31,32,33,34,35,36,37,38,39,40
//=> record 41,42,43,44,45,46,47,48,49,50
//=> record 51,52,53,54,55,56,57,58,59,60
//=> record 61,62,63,64,65,66,67,68,69,70
//=> record 71,72,73,74,75,76,77,78,79,80
//=> record 81,82,83,84,85,86,87,88,89,90
//=> record 91,92,93,94,95,96,97,98,99,100
//=> record 100

Как вы можете видеть на выходе, он испустил:

  1. Первая запись
  2. Десять партий по 10 записей в каждой
  3. Последняя запись

Я не буду пытаться решить ваше упражнение для вас, и я не слишком знаком с RxJS, чтобы слишком подробно рассказать об этом.

Я просто хотел показать вам другой способ и дать вам знать, что это можно совместить с функциональным программированием.

Надеюсь, это поможет

...