Задание шаблона потока данных Google не масштабируется при записи записей в хранилище данных Google - PullRequest
0 голосов
/ 02 мая 2018

У меня есть небольшое задание потока данных, запущенное из облачной функции с использованием шаблона потока данных. Задание в основном читает из таблицы в Bigquery, преобразует результирующую таблицу в значение ключа и записывает значение ключа в хранилище данных.

Вот как выглядит мой код: -

PCollection<TableRow> bigqueryResult = p.apply("BigQueryRead",
                BigQueryIO.readTableRows().withTemplateCompatibility()
                        .fromQuery(options.getQuery()).usingStandardSql()
                        .withoutValidation());

bigqueryResult.apply("WriteFromBigqueryToDatastore", ParDo.of(new DoFn<TableRow, String>() {                
            @ProcessElement
            public void processElement(ProcessContext pc) {
                TableRow row = pc.element();

                Datastore datastore = DatastoreOptions.getDefaultInstance().getService();
                KeyFactory keyFactoryCounts = datastore.newKeyFactory().setNamespace("MyNamespace")
                        .setKind("MyKind");

                Key key = keyFactoryCounts.newKey("Key");
                Builder builder =   Entity.newBuilder(key);
                builder.set("Key", BooleanValue.newBuilder("Value").setExcludeFromIndexes(true).build());   

                Entity entity= builder.build();
                datastore.put(entity);
            }
        }));

Этот конвейер работает нормально, когда число записей, которые я пытаюсь обработать, находится в диапазоне от 1 до 100. Однако, когда я пытаюсь увеличить нагрузку на конвейер, т. Е. ~ 10000 записей, конвейер не масштабируется ( даже если для автоматического масштабирования задано значение THROUGHPUT, а для MaximumWorkers задано значение 50 (тип машины n1-standard-1). Работа продолжает обрабатывать 3 или 4 элемента в секунду с одним или двумя рабочими. Это влияет на производительность моей системы.

Любые советы о том, как увеличить производительность, очень приветствуются. Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 03 мая 2018

Нашел решение, используя DatastoreIO вместо клиента хранилища данных. Ниже приведен фрагмент, который я использовал,

    PCollection<TableRow> row = p.apply("BigQueryRead",
                BigQueryIO.readTableRows().withTemplateCompatibility()
                        .fromQuery(options.getQueryForSegmentedUsers()).usingStandardSql()
                        .withoutValidation());          

    PCollection<com.google.datastore.v1.Entity> userEntity = row.apply("ConvertTablerowToEntity", ParDo.of(new DoFn<TableRow, com.google.datastore.v1.Entity>() {

        @SuppressWarnings("deprecation")
        @ProcessElement
        public void processElement(ProcessContext pc) {
            final String namespace = "MyNamespace";
            final String kind = "MyKind";

            com.google.datastore.v1.Key.Builder keyBuilder = DatastoreHelper.makeKey(kind, "root");
            if (namespace != null) {
              keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
            }
            final com.google.datastore.v1.Key ancestorKey = keyBuilder.build();

            TableRow row = pc.element();
            String entityProperty = "sample";

            String key = "key";

            com.google.datastore.v1.Entity.Builder entityBuilder = com.google.datastore.v1.Entity.newBuilder();
            com.google.datastore.v1.Key.Builder keyBuilder1 = DatastoreHelper.makeKey(ancestorKey, kind, key);
            if (namespace != null) {
                keyBuilder1.getPartitionIdBuilder().setNamespaceId(namespace);
              }

              entityBuilder.setKey(keyBuilder1.build());
              entityBuilder.getMutableProperties().put(entityProperty, DatastoreHelper.makeValue("sampleValue").build());
              pc.output(entityBuilder.build());             
        }

    }));

    userEntity.apply("WriteToDatastore", DatastoreIO.v1().write().withProjectId(options.getProject()));

Это решение способно масштабироваться от 3 элементов в секунду с 1 рабочим до ~ 1500 элементов в секунду с 20 рабочими.

0 голосов
/ 03 мая 2018

По крайней мере, с клиентской библиотекой Python ndb можно записать до 500 объектов одновременно за один вызов .put_multi() хранилища данных - намного быстрее, чем вызов .put() для одного объекта за раз (вызовы блокируют базовые RPC)

Я не пользователь java, но похоже, что подобная техника также доступна для него. От Использование пакетных операций :

Вы можете использовать пакетные операции, если хотите работать с несколькими сущности в одном вызове Cloud Datastore.

Вот пример пакетного вызова:

Entity employee1 = new Entity("Employee");
Entity employee2 = new Entity("Employee");
Entity employee3 = new Entity("Employee");
// ...

List<Entity> employees = Arrays.asList(employee1, employee2, employee3);
datastore.put(employees);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...