Асинхронная запись в двоичный объект appengine и его завершение после завершения всех задач - PullRequest
1 голос
/ 25 января 2012

У меня сложная проблема.

Я перебираю набор URL-адресов, параметризованных по дате, и извлекаю их.Например, вот пример одного из них:

somewebservice.com? Start = 01-01-2012 & end = 01-10-2012

Иногда содержимое, возвращаемое с URL-адреса, усекается (отсутствуют случайные результаты с приложенным сообщением об усеченной ошибке), поскольку я определил слишком большой диапазон, поэтому мне нужно разделить запрос на два URL-адреса

somewebservice.com? start = 01-01-2012 & end =01-05-2012

somewebservice.com? Start = 01-06-2012 & end = 01-10-2012

Я делаю это рекурсивно, пока результаты больше не усекаются, а затем язапись в большой двоичный объект, который допускает одновременную запись.

Каждый из этих вызовов извлечения URL-адресов / запись большого двоичного объекта обрабатывается в отдельной задаче очереди задач.

Проблема в том, что я не могу дляЖизнь меня разработала схему, чтобы знать, когда все задачи были выполнены.Я пытался использовать заштрихованные счетчики, но рекурсия затрудняет.Кто-то предложил мне использовать Pipeline API, поэтому я наблюдал, как Слаткин разговаривает 3 раза.Похоже, он не работает с рекурсией (но я признаю, что до сих пор не до конца понимаю библиотеку).

Есть ли какой-нибудь способ узнать, когда завершен набор задач из очереди задач (и дочерних элементов, которые порождаются рекурсивно), чтобы я мог завершить создание своего большого двоичного объекта и делать с ним что угодно?

Спасибо, Джон

Ответы [ 3 ]

2 голосов
/ 25 января 2012

Вы прочитали Документы по началу работы с ? Конвейеры могут создавать другие конвейеры и ждать на них, поэтому делать то, что вы хотите, довольно просто:

class RecursivePipeline(pipeline.Pipeline):
  def run(self, param):
    if some_condition: # Too big to process in one
      p1 = yield RecursivePipeline(param1)
      p2 = yield RecursivePipeline(param2)
      yield RecursiveCombiningPipeline(p1, p2)

Где RecursiveCombiningPipeline просто действует как получатель для значений двух под-конвейеров.

0 голосов
/ 26 января 2012

Хорошо, вот что я сделал. Мне пришлось немного изменить решение Митча, но он определенно дал мне верное направление с советом вернуть будущее значение вместо немедленного.

Я должен был создать промежуточный DummyJob, который принимает выходные данные рекурсии

   public static class DummyJob extends Job1<Void, List<Void>> {
      @Override
      public Value<Void> run(List<Void> dummies) {
         return null;
      }
   }

Затем я отправляю вывод DummyJob в финализатор Blob в ожидании

List<FutureValue<Void>> dummies = new ArrayList<FutureValue<Void>>();
for (Interval in : ins) {
   dummies.add(futureCall(new DataFetcher(), immediate(file), immediate(in.getStart()),
         immediate(in.getEnd())));
}

FutureValue<Void> fv = futureCall(new DummyJob(), futureList(dummies));

return futureCall(new DataWriter(), immediate(file), waitFor(fv));

Спасибо, Митч и Ник !!

0 голосов
/ 25 января 2012

Вот пример использования Java Pipeline

пакет com.example;

import com.google.appengine.tools.pipeline.FutureValue;
import com.google.appengine.tools.pipeline.Job1;
import com.google.appengine.tools.pipeline.Job2;
import com.google.appengine.tools.pipeline.Value;

public class PipelineRecursionDemo {

  /**
   * A Job to count the number of letters in a word
   * using recursion
   */
  public static class LetterCountJob extends Job1<Integer, String> {

    public Value<Integer> run(String word) {
      int length = word.length();
      if (length < 2) {
        return immediate(word.length());
      } else {
        int mid = length / 2;
        FutureValue<Integer> first = futureCall(new LetterCountJob(),
            immediate(word.substring(0, mid)));
        FutureValue<Integer> second = futureCall(new LetterCountJob(),
            immediate(word.substring(mid, length)));
        return futureCall(new SumJob(), first, second);
      }
    }
  }

  /**
   * An immediate Job to add two integers
   */
  public static class SumJob extends Job2<Integer, Integer, Integer> {

    public Value<Integer> run(Integer x, Integer y) {
      return immediate(x + y);
    }
  }
}
...