Как контролировать размер чанка или считать чанки в Spanner Stream NodeJS - PullRequest
0 голосов
/ 28 апреля 2020

Проблема:

Мы пытались использовать Spanner runStream для переноса части данных (миллионы данных) с помощью указанного c запроса (в выберите запрос мы использовали более одной таблицы)

 spannerDatabase
  .runStream({
    sql,
    params,
    json: true
  })

Но после запуска облачных функций (размером 256 МБ) для этого - у меня Error: memory limit exceeded.

Расследование

После расследования я обнаружил, что @google-cloud/spanner используетpartalResultStream, где maxQueued равно 10, и кажется, что нет подходящих параметров для управления размером порции или количеством порций (каждый ответ порции около 100 000 значения полей)

На данный момент мы пытаемся использовать метод client.executeStreamingSql, но там у нас есть большая реализация (например, подготовка параметров Struct, parseChunk как то, что PartialResultStream делает )

client.executeStreamingSql({
  session: session.id,
  sql,
  paramsType,
  params,
}, {
  pageSize: 1000,
});

Ожидание

Нам нужно уменьшить потребление памяти при чтении с Spanner.

ОБНОВЛЕНО:

Вот пример использования потока в гаечном ключе (мы пытаемся записать в файл)

const { Spanner } = require("@google-cloud/spanner");
const fs = require("fs");
const util = require('util');
const stream = require('stream');
const through = require('through');

const pipeline = util.promisify(stream.pipeline);

const spanner = {
  "instanceId": "..",
  "databaseId": "..",
  "projectId": ".."
};
const filePath = "./users.data.txt";
const keyFilename = "service-account.json";

const spannerDatabase = new Spanner({
  projectId: spanner.projectId,
  keyFilename,
})
  .instance(spanner.instanceId)
  .database(spanner.databaseId);

const spannerStream = spannerDatabase.runStream({
  sql: 'select * from user',
  json: true,
  gaxOptions: {
    pageSize: 1000,
  },
});

const run = async () => {
  await pipeline(
    spannerStream,
    through(function (data) {
      return this.queue(`${JSON.stringify(data)}\n`);
    }),
    fs.createWriteStream(filePath)
  );

  process.exit(0);
};

run();

Использование памяти невероятно увеличивается каждый раз при использовании данных. В таблице пользователей Spanner около 100 000 строк.

1 Ответ

0 голосов
/ 29 апреля 2020

Можете ли вы попробовать следующее?

     spannerDatabase
      .runStream({
        sql,
        params,
        json: true,
       gaxOptions: {
          pageSize: 1000,
       }
      })
...