Как реализовать барьер для последовательных асинхронных операций Objectify save () в цикле - PullRequest
0 голосов
/ 28 января 2019

У нашей команды есть конвейер GAE, в котором GenerateJob может генерировать и сохранять огромное количество MyEntity в Datastore.В настоящее время операции save в GenerateJob выполняются последовательно и синхронно с использованием ofy().save().entity().now() в цикле for.Мы хотели бы улучшить время выполнения, изменив цикл на ofy().save().entity() для асинхронной обработки.

Однако позже в конвейере у нас есть другая работа, которая читает все эти записи и обрабатывает их.Как мы можем создать барьер для ожидания всех операций save?Будет ли ObjectifyFilter.complete() работать в этом случае?Как то так:

public class JobA {
  public void generate(MyInput input) {
    for (MyEntity entity : processInput(input)) {
      // ... Additional work for entity.

      addResult(entity);
    }

    // Will ObjectifyFilter.complete() work as a barrier here?
  }

  public void addResult(MyEntity entity) {
    // Change to ofy().save().entity(entity) for async ops?
    ofy().save().entity(entity).now();
  }
}

public void JobB {
  public void someWorkOnMyEntities() {
    // Need to make sure all the previous save() operations are finished.
    Iterable<MyEntity> entities = ofy().load().type(MyEntity.class);
    // ... processing logic.
  }
}

1 Ответ

0 голосов
/ 29 января 2019

Сначала взгляните на ofy().defer().save().Это объединит кучу операций сохранения в одну массовую операцию в конце вашей транзакции (или сеанса).Вы также можете периодически вызывать flush(), если хотите завершить текущий процесс.

Однако выяснить, как создать какой-либо барьер записи между заданиями на серверах в распределенной системе, нетривиально.Objectify не предоставляет никакой помощи, но Objectify является низкоуровневым API.Вам понадобится что-то на более высоком уровне, которое понимает, как работает ваша очередь на работу.

...