Работа, которую вы описываете, вероятно, подходит как для очереди, так и для комбинации сервера очередей и заданий. Он также может работать как набор шагов MapReduce.
Для сервера заданий я рекомендую взглянуть на Gearman. Документация не впечатляет, но презентации отлично ее документируют, и модуль Python также довольно понятен.
По сути, вы создаете функции на сервере заданий, и эти функции вызываются клиентами через API. Функции могут быть вызваны либо синхронно, либо асинхронно. В вашем примере вы, вероятно, хотите асинхронно добавить задание «Начать обновление». Это выполнит любые подготовительные задачи, а затем асинхронно вызовет задание «Получить подписчиков». Это задание будет извлекать пользователей, а затем вызывать задание «Обновить подписчиков». При этом все задания «Получи избранное для пользователя А» и друзей будут отправлены одновременно, и синхронно будет ожидать результатов всех из них. Когда они все вернутся, он вызовет задание «Рассчитать новую очередь».
Этот подход, основанный только на сервере заданий, изначально будет несколько менее надежным, поскольку обеспечение правильной обработки ошибок, любых отключенных серверов и постоянства будет забавным.
Для очереди SQS - очевидный выбор. Он надежен, очень быстрый доступ из EC2 и дешевый. И намного проще в настройке и обслуживании, чем в других очередях, когда вы только начинаете.
По сути, вы помещаете сообщение в очередь, так же, как если бы вы отправляли задание на сервер заданий выше, за исключением того, что вы, вероятно, ничего не будете делать синхронно. Вместо того, чтобы выполнять «Получить избранное для пользователя A» и т. Д. Синхронно, вы сделаете их асинхронно, а затем получите сообщение, в котором говорится, что все они завершены. Вам понадобится какое-то постоянство (база данных SQL, с которой вы знакомы, или Amazon SimpleDB, если вы хотите полностью использовать AWS), чтобы отслеживать, выполнена ли работа - вы не можете проверить ход выполнения задания в SQS (хотя вы можете в других очередях). Сообщение, которое проверяет, все ли они закончены, выполнит проверку - если они не все закончены, ничего не делайте, и затем сообщение будет повторено через несколько минут (на основе visibility_timeout). В противном случае вы можете поместить следующее сообщение в очередь.
Этот подход только к очереди должен быть надежным, предполагая, что вы не используете сообщения очереди по ошибке, не выполняя работу. Подобную ошибку трудно сделать с SQS - вам действительно нужно попробовать. Не используйте автоматически использующие очереди или протоколы - если произойдет ошибка, вы не сможете гарантировать, что вы поместите сообщение о замене обратно в очередь.
В этом случае может быть полезна комбинация сервера очередей и сервера заданий. Вы можете обойтись без наличия постоянного хранилища для проверки хода выполнения задания - сервер заданий позволит вам отслеживать ход выполнения задания. Ваше сообщение «получить избранное для пользователей» может поместить все задания «получить избранное для пользователя A / B / C» на сервер заданий. Затем поместите в очередь сообщение «проверить все избранные выборки выполнено» со списком задач, которые необходимо выполнить (и достаточной информации для перезапуска любых заданий, которые таинственным образом исчезают).
Для бонусных баллов:
Сделать это как MapReduce должно быть довольно легко.
Ваша первая работа будет списком всех ваших пользователей. Карта будет принимать каждого пользователя, получать подписанных пользователей и выходные строки для каждого пользователя и его последующего пользователя:
"UserX" "UserA"
"UserX" "UserB"
"UserX" "UserC"
Шаг уменьшения идентичности оставит это без изменений. Это сформирует вход второго задания. Карта для второго задания получит избранное для каждой строки (вы можете использовать memcached для предотвращения выборки избранных для комбо UserX / UserA и UserY / UserA через API) и вывести строку для каждого избранного:
"UserX" "UserA" "Favourite1"
"UserX" "UserA" "Favourite2"
"UserX" "UserA" "Favourite3"
"UserX" "UserB" "Favourite4"
Шаг сокращения для этой работы преобразует это в:
"UserX" [("UserA", "Favourite1"), ("UserA", "Favourite2"), ("UserA", "Favourite3"), ("UserB", "Favourite4")]
В этот момент у вас может быть другое задание MapReduce для обновления базы данных для каждого пользователя с этими значениями, или вы можете использовать некоторые из инструментов, связанных с Hadoop, таких как Pig, Hive и HBase, для управления вашей базой данных дляyou.
Я бы порекомендовал использовать Cloudera Distribution для команд управления Hadoop ec2, чтобы создать и разрушить кластер Hadoop на EC2 (на их AMI настроен Python), и использовать что-то вроде Dumbo (на PyPI).для создания заданий MapReduce, поскольку он позволяет вам тестировать задания MapReduce на локальном компьютере / компьютере разработчика без доступа к Hadoop.
Удачи!