Есть ли способ обеспечить поведение FIFO (первым пришел, первым вышел) с очередями задач в GAE? - PullRequest
7 голосов
/ 02 апреля 2012

Есть ли способ обеспечить поведение FIFO (первым пришел, первым вышел) с очередями задач в GAE?

Документация GAE гласит, что FIFO является одним из факторов, влияющих на порядок выполнения задач, но в той же документации говорится, что «планирование системы может« перепрыгивать »новые задачи в начало очереди», и я подтвердил это поведение тестовое задание. Эффект: мои события обрабатываются не по порядку.

Документы говорят:

https://developers.google.com/appengine/docs/java/taskqueue/overview-push

Порядок выполнения задач зависит от нескольких факторов:

Положение задачи в очереди. App Engine пытается обрабатывать задачи в порядке FIFO> (первым пришел, первым вышел). Как правило, задачи вставляются в конец очереди, и казнен из головы очереди.

Отставание задач в очереди. Система пытается обеспечить минимальную задержку возможно для любой заданной задачи с помощью специально оптимизированных уведомлений планировщику. Таким образом, в случае, когда очередь имеет большое отставание задач, Системное планирование может «перепрыгивать» новые задачи в начало очереди .

Значение свойства etaMillis задачи. Это свойство определяет самое раннее время, когда задача может быть выполнена. App Engine всегда ждет, пока после указанного ETA обрабатывать push-задачи.

Значение свойства countdownMillis задачи. Это свойство определяет минимум количество секунд ожидания перед выполнением задачи. Обратный отсчет и эта являются взаимоисключающими; если вы укажете один, не указывайте другой.

Что мне нужно сделать? В моем случае я буду обрабатывать 1-2 миллиона событий в день, приходящих с транспортных средств. Эти события могут быть отправлены с любым интервалом (1 секунда, 1 минута или 1 час). Порядок обработки события должен быть гарантирован. Мне нужен процесс по метке времени, который генерируется на встроенном устройстве внутри автомобиля.

Что у меня сейчас?

  1. Сервлет Rest, вызываемый потребителем и создающий задачу (данные события находятся в полезной нагрузке).

  2. После этого рабочий сервлет получает эту задачу и:

    • Десериализация данных события;

    • Поместить событие в хранилище данных;

    • Обновление автомобиля в хранилище данных.

Итак, опять же, есть ли способ обеспечить только поведение FIFO? Или как я могу улучшить это решение, чтобы получить это?

Ответы [ 7 ]

4 голосов
/ 03 апреля 2012

Вы должны подойти к этому с тремя отдельными шагами:

  1. Реализация Счетчика Sharding для генерации монотонно увеличивающегося ID.Как бы мне не хотелось использовать timestamp с сервера Google для обозначения порядка задач, похоже, что временные метки между серверами GAE могут отличаться от того, что вам требуется.

  2. Добавьте ваши задачиPull Queue вместо Push Queue.При построении TaskOption добавьте ID, полученный на шаге 1, в качестве тега .После добавления задачи сохраните ID где-нибудь в вашем хранилище данных.

  3. Пусть ваш рабочий сервлет арендует Tasks по определенному тегу из Pull Queue,Запросите хранилище данных, чтобы получить самый ранний идентификатор, который вам нужно выбрать, и используйте ID в качестве аренды tag.Таким образом, вы можете смоделировать поведение FIFO для вашей очереди задач.

После завершения обработки удалите ID из хранилища данных и не забудьте удалить Task от вашего Pull Queue тоже.Кроме того, я бы порекомендовал вам запускать ваши задачи в Backend.

ОБНОВЛЕНИЕ: Как отметили Ник Джонсон и mjaggard, шардинг на шаге № 1, по-видимому, не жизнеспособен для генерации монотонно увеличивающихся идентификаторов, итогда понадобятся другие источники удостоверений личности.Кажется, я помню, что вы использовали временные метки, сгенерированные вашими транспортными средствами, возможно ли использовать их вместо монотонно увеличивающегося идентификатора?

Независимо от способа генерации идентификаторов, основная идея заключается в использовании хранилища данных.механизм запросов для создания порядка FIFO Tasks и использования задачи Tag для извлечения конкретной задачи из TaskQueue.

Однако есть предостережение.Из-за возможной политики чтения согласованности в хранилищах данных с высокой репликацией, если вы выберете HRD в качестве хранилища данных (и вам следует отказаться от M / S по состоянию на 4 апреля 2012 г.), могут быть некоторые устаревшие данные, возвращаемые запросом наШаг № 2.

3 голосов
/ 11 апреля 2012

Хорошо. Вот как я это сделал.

1) Rest servlet that is called from the consumer:

    If Event sequence doesn't match Vehicle sequence (from datastore)

        Creates a task on a "wait" queue to call me again

    else

       State validation

       Creates a task on the "regular" queue (Event data is on payload).


2) A worker servlet gets the task from the "regular" queue, and so on... (same pseudo code)

Таким образом, я могу приостановить «обычную» очередь для обслуживания данных без потери событий.

Спасибо за ваши ответы. Мое решение - их смесь.

3 голосов
/ 02 апреля 2012

Я думаю, что простой ответ - «нет», хотя частично, чтобы помочь улучшить ситуацию, я использую очередь извлечения - извлекаю 1000 задач за раз, а затем сортирую их.Если сроки не важны, вы можете отсортировать их и поместить в хранилище данных, а затем завершить пакет за раз.Вам все еще нужно решить, что делать с задачами в начале и в конце пакета - потому что они могут быть не в порядке с чередованием задач в других пакетах.

0 голосов
/ 05 апреля 2012

Следуя этой теме, мне неясно, является ли строгое требование FIFO для всех полученных транзакций или для каждого транспортного средства.У последних больше вариантов, чем у прежних.

0 голосов
/ 05 апреля 2012

Лучший способ справиться с этим, распределенным способом или «способом App Engine», вероятно, состоит в том, чтобы модифицировать ваш алгоритм и сбор данных для работы только с отметкой времени, позволяя произвольно упорядочивать задачи.

Предполагая, что это невозможно или слишком сложно, вы можете изменить свой алгоритм следующим образом:

  1. при создании задачи не помещайте данные в полезную нагрузку, а в хранилище данных, в виде с порядком меток времени и сохраняйте как дочерний объект любого объекта, который вы пытаетесь обновить (GTule?) , Отметки времени должны исходить от клиента, а не от сервера, чтобы гарантировать тот же порядок.

  2. запустить обобщенную задачу, которая извлекает данные для первой отметки времени, обрабатывает их и затем удаляет их внутри транзакции.

0 голосов
/ 03 апреля 2012

Сам не знаю ответа, но может быть так, что задачи, поставленные в очередь с использованием отложенной функции, могут выполняться в представленном порядке.Скорее всего, вам понадобится инженер из Г., чтобы получить ответ.Очереди вытягивания, как предложено, кажутся хорошей альтернативой, плюс это позволит вам рассмотреть пакетирование ваших пут ().

Одно замечание о заштрихованных счетчиках: они увеличивают вероятность монотонно увеличивающихся идентификаторов, но не гарантируют их.

0 голосов
/ 03 апреля 2012

Вы можете поместить работу, которую нужно выполнить, в строку в хранилище данных с отметкой времени создания, а затем извлекать рабочие задачи по этой отметке времени, но если ваши задачи создаются слишком быстро, вы столкнетесь с проблемами задержки.

...