В моей базе данных есть куча объектов Feed, и я пытаюсь обновлять каждый Feed каждый час. Моя проблема здесь заключается в том, что мне нужно убедиться, что нет повторяющихся обновлений - это должно происходить не чаще одного раза в час, но я также не хочу, чтобы каналы ждали обновления в течение двух часов. (Это нормально, если это происходит каждый час +/- несколько минут, но дважды за несколько минут это плохо.)
Я использую Django и Celery с Amazon SQS в качестве брокера. У меня есть код обновления фида, настроенный как задача Celery, но я не могу найти способ предотвратить дублирование, оставаясь совместимым с Celery, работающим на нескольких узлах.
Мое текущее решение состоит в том, чтобы добавить атрибут last_update_scheduled
к модели ленты и запускать следующую задачу каждые 5 минут (псевдокод):
threshold = datetime.now() - timedelta(seconds=3600)
for f in Feed.objects.filter(Q(last_update_scheduled__lt = threshold) |
Q(last_update_scheduled = None)):
updateFeed.delay(f)
f.last_update_scheduled = now
f.save()
Это подвержено ряду проблем синхронизации. Например, если резервное копирование моих очередей задач может выполняться одновременно, эта задача может выполняться дважды, что приведет к дублированию обновлений. Я видел несколько решений для этого (например, рецепт сельдерея и адаптация при переполнении стека ), но решение memcached не является надежным, например, Дубликаты могут возникнуть при перезапуске memcached или в случае нехватки памяти и очистки старых данных. Не говоря уже о том, что я не хотел бы добавлять memcached в мою производственную конфигурацию только для простой блокировки.
В идеальном мире я бы хотел сказать:
@modelTask(Feed, run_every=3600)
def updateFeed(feed):
# do something expensive
Но пока мое воображение подводит меня к тому, как реализовать этот декоратор.