Собирать данные в потоке, пока не будет изменен идентификатор? - PullRequest
0 голосов
/ 08 мая 2020

У меня есть конвейерная линия потоков:

pipeline(readStream, transformStream, writeStream);

readStream, передаваемый в объект transformStream (при каждом событии «data»), например:

{
   id: 1,
   name: 'John',
   phone: 1000000,
}

Мне нужно хранить телефоны в преобразовать поток до тех пор, пока идентификатор не будет изменен в этом объекте, тогда я должен sh объект, подобный этому, в буфер потока:

{
   id: 1,
   name: 'John',
   phones: [1000000, 1000001, 1000002],
}

Таким образом, массив:

[
  {
   id: 1,
   name: 'John',
   phone: 1000000,
  },
  {
   id: 1,
   name: 'John',
   phone: 1000001,
  },
  {
   id: 2,
   name: 'Ray',
   phone: 1000002,
  },
  {
   id: 3,
   name: 'Santa',
   phone: 1000003,
  },
]

после преобразования потока будет быть:

[
  {
   id: 1,
   name: 'John',
   phones: [1000000, 1000001],
  },
  {
   id: 2,
   name: 'Ray',
   phones: [1000002],
  },
  {
   id: 3,
   name: 'Santa',
   phones: [1000003],
  },
]

Как я могу это реализовать?

1 Ответ

0 голосов
/ 08 мая 2020

Вот простой пример. Основная идея состоит в том, чтобы отслеживать предыдущую запись / фрагмент и сравнивать ее с текущей записью, чтобы решить, следует ли обновлять свойство phones или данные pu sh для следующего потока.

const streamify = require('stream-array'); // using this package for testing purposes only
const Stream = require('stream');

const input = [
  {
    id: 1,
    name: 'John',
    phone: 1000000,
  },
  {
    id: 1,
    name: 'John',
    phone: 1000001,
  },
  {
    id: 2,
    name: 'Ray',
    phone: 1000002,
  },
  {
    id: 3,
    name: 'Santa',
    phone: 1000003,
  },
];

function createObjectMergeStream() {
  let previousEntry = null;

  return new Stream.Transform({
    writableObjectMode: true,
    transform: transformFunc,
    flush(callback) {
      callback(null, JSON.stringify(previousEntry)); // stringifying for demonstration only
    }
  });

  function transformFunc(currentEntry, encoding, callback){
    if (previousEntry === null) {
      // if this is the first the stream is receiving
      previousEntry = {
        id: currentEntry.id,
        name: currentEntry.name,
        phones: [currentEntry.phone]
      }

      callback();
    }
    else if (previousEntry.id === currentEntry.id) {
      // if the id's match, only update the phones array
      previousEntry.phones.push(currentEntry.phone);
      callback();
    }
    else {
      // if this entry does not match the id of the previous entry

      // stringifying for demonstration only
      const output = JSON.stringify(previousEntry) + '\n';

      previousEntry = {
        id: currentEntry.id,
        name: currentEntry.name,
        phones: [currentEntry.phone]
      }

      callback(null, output);
    }
  }
}


// turn the `input` array into a readable  stream
const inputStream = streamify(input);

// create our transform stream
const transformStream = createObjectMergeStream();

// take objects from input array, process the objects in our transform stream then print them to the console
inputStream.pipe(transformStream).pipe(process.stdout);

Выходы

{"id":1,"name":"John","phones":[1000000,1000001]}
{"id":2,"name":"Ray","phones":[1000002]}
{"id":3,"name":"Santa","phones":[1000003]}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...