Как вы оборачиваете записи sequelize, которые происходят в STREAM node.js в транзакции? - PullRequest
0 голосов
/ 11 июля 2019

Я проверяю файлы CSV и затем записываю их в базу данных.Текущая настройка - File.pipe (validationStream) .pipe (DBWriteStream);Как мне обернуть все записи в моем DBWriteStream в транзакцию, которую можно отменить, если в validationStream обнаружены ошибки?Если ошибки найдены, я хочу отменить весь файл.

Node.js 7, как использовать транзакцию сиквелизирования с async / await?

Приведенный выше пример, а также приведенный в документации, зависят от цепочек обещаний и асинхронных функций.тем не менее, мой поток является функцией записи потока преобразования и не использует Promise.

async function getWriteStream(params) {
  const rowLimit = 2000;
  let rowsToWrite = [];

  const writeStream = through2.obj(write, end);

  // I'd like to rollback this report creation
  const result = await createReportId(params);
  const reportId = result.id;

  // buffer = {isValid: bool, data: rowData}
  // if I receive an "isValid: false" from the validation stream,
  // i'd like to stop writing and rollback everything
  function write(buffer, encoding, next) {
    let rollBack = false;

    if (buffer.isValid && !rollBack) {
      buffer.data.forEach(e => {
        e.some_report_id = reportId;
        pushRows(e);
      });
    } else if (!rollBack) {
      rollBack = true;
      // rollback transaction?
    }
    next();
  }

  function end(done) {
    writeRows();
    done();
  }

  async function pushRows(row) {
    rowsToWrite.push(row);
    if (rowsToWrite.length >= rowLimit) {
      await writeRows();
    }
  }

  // want to rollback all of these writes
  async function writeRows() {
    if (rowsToWrite.length > 0) {
      const toSave = rowsToWrite.slice(0);
      rowsToWrite = [];
      await Some.bulkCreate(toSave);
    }
  }
  return writeStream;
}

Моя проблема в том, что я не хочу выдавать ошибку и останавливать поток проверки, потому что, если в CSV есть несколько ошибок, я хотел бы собрать всю эту информацию в первый раз и отправитьпользователю.

Кроме того, строки на самом деле записываются в функции «запись» потока преобразования, которая не является асинхронной функцией, поэтому я не могу ее ждать.

...