У меня есть небольшое задание потока данных, запущенное из облачной функции с использованием шаблона потока данных. Задание в основном читает из таблицы в Bigquery, преобразует результирующую таблицу в значение ключа и записывает значение ключа в хранилище данных.
Вот как выглядит мой код: -
PCollection<TableRow> bigqueryResult = p.apply("BigQueryRead",
BigQueryIO.readTableRows().withTemplateCompatibility()
.fromQuery(options.getQuery()).usingStandardSql()
.withoutValidation());
bigqueryResult.apply("WriteFromBigqueryToDatastore", ParDo.of(new DoFn<TableRow, String>() {
@ProcessElement
public void processElement(ProcessContext pc) {
TableRow row = pc.element();
Datastore datastore = DatastoreOptions.getDefaultInstance().getService();
KeyFactory keyFactoryCounts = datastore.newKeyFactory().setNamespace("MyNamespace")
.setKind("MyKind");
Key key = keyFactoryCounts.newKey("Key");
Builder builder = Entity.newBuilder(key);
builder.set("Key", BooleanValue.newBuilder("Value").setExcludeFromIndexes(true).build());
Entity entity= builder.build();
datastore.put(entity);
}
}));
Этот конвейер работает нормально, когда число записей, которые я пытаюсь обработать, находится в диапазоне от 1 до 100. Однако, когда я пытаюсь увеличить нагрузку на конвейер, т. Е. ~ 10000 записей, конвейер не масштабируется ( даже если для автоматического масштабирования задано значение THROUGHPUT, а для MaximumWorkers задано значение 50 (тип машины n1-standard-1). Работа продолжает обрабатывать 3 или 4 элемента в секунду с одним или двумя рабочими. Это влияет на производительность моей системы.
Любые советы о том, как увеличить производительность, очень приветствуются.
Заранее спасибо.