Google Cloud Datastore: подсчет всех объектов в определенном состоянии - PullRequest
0 голосов
/ 16 января 2019

Фон

Мне нужно отправить большую партию уведомлений примерно на ~ 1 млн устройств, и я создаю ее с помощью облачных функций Google.

В текущей настройке я ставлю каждый маркер устройства как сообщение PubSub, которое:

  • хранит ожидающее уведомление в хранилище данных, которое используется для отслеживания попыток и статуса успеха
  • пытается отправить уведомление
  • помечает уведомление как успешное или неуспешное, если оно достаточно повторено и не прошло

Это работает более-менее нормально, и я получаю при этом приличную производительность, что-то 1.5K токенов, обрабатываемых в секунду.

Выпуск

Я хочу отслеживать текущий прогресс всей работы. Учитывая, что я знаю, сколько уведомлений, которые я ожидаю обработать, я хочу сделать так, чтобы иметь возможность сообщать что-то вроде обработанной x / 1_000_000, а затем считать, что это сделано, когда сумма неудач + успехов равна той, которую я хотел обработать.

Документация DataStore предлагает не выполнять подсчет самих сущностей, поскольку он не будет работать, что я могу подтвердить. Я реализовал счетчик, следуя их примеру документации осколочного счетчика , который я включаю в конце.

Проблема, с которой я сталкиваюсь, заключается в том, что она довольно медленная и очень склонна к возврату 409 Contention error s, что заставляет мои вызовы функций повторяться, что не идеально, учитывая, что сам подсчет не является существенным для процесса, и есть только ограниченный бюджет повторных попыток для каждого уведомления. На практике больше всего не получается увеличить счетчик, который происходит в конце процесса, что увеличило бы нагрузку при чтении уведомлений, чтобы проверить их состояние при повторных попытках, и означает, что я получаю счетчик, который меньше, чем фактические успешные уведомления .

Я провел быстрый бенчмарк с помощью wrk и, похоже, получил около 400 RPS из-за увеличения счетчика со средней задержкой 250 мс. Это довольно медленно по сравнению с самой логикой уведомлений, которая выполняет около 3 запросов к хранилищу данных на одно уведомление, и, вероятно, является более сложной, чем увеличение счетчика. При добавлении к конфликтным ошибкам я получаю реализацию, которую не считаю стабильной. Я понимаю, что Datastore обычно автоматически масштабируется при постоянном интенсивном использовании, но схема использования этого сервиса очень редка и для всей партии токенов, поэтому не было бы никакого предыдущего трафика, чтобы масштабировать это.

Вопросы

  • Что-то мне не хватает в реализации счетчика, которую можно улучшить, чтобы сделать ее менее медленной?
  • Есть ли другой подход, который я должен рассмотреть, чтобы получить то, что я хочу?

Код

Код, взаимодействующий с хранилищем данных

DATASTORE_READ_BATCH_SIZE = 100

class Counter():
    kind = "counter"
    shards = 2000

    @staticmethod
    def _key(namespace, shard):
        return hashlib.sha1(":".join([str(namespace), str(shard)]).encode('utf-8')).hexdigest()

    @staticmethod
    def count(namespace):
        keys = []
        total = 0
        for shard in range(Counter.shards):
            if len(keys) == DATASTORE_READ_BATCH_SIZE:
                counters = client.get_multi(keys)
                total = total + sum([int(c["count"]) for c in counters])
                keys = []
            keys.append(client.key(Counter.kind, Counter._key(namespace, shard)))

        if len(keys) != 0:
            counters = client.get_multi(keys)
            total = total + sum([int(c["count"]) for c in counters])

        return total

    @staticmethod
    def increment(namespace):
        key = client.key(Counter.kind, Counter._key(namespace, random.randint(0, Counter.shards - 1)))
        with client.transaction():
            entity = client.get(key)
            if entity is None:
                entity = datastore.Entity(key=key)
                entity.update({
                    "count": 0,
                })
            entity.update({
                "count": entity["count"] + 1,
            })
            client.put(entity)

Это вызывается из облачной функции Google примерно так

from flask import abort, jsonify, make_response
from src.notify import FCM, APNS
from src.lib.datastore import Counter

def counter(request):
    args = request.args

    if args.get("platform"):
        Counter.increment(args["platform"])
        return

    return jsonify({
        FCM: Counter.count(FCM),
        APNS: Counter.count(APNS)
    })

Используется как для увеличения, так и для считывания, и делится по платформам для iOS и Android.

1 Ответ

0 голосов
/ 19 января 2019

В итоге я разочаровался и начал также сохранять статус уведомлений в BigQuery. Цены по-прежнему разумны, поскольку они все еще используются, а потоковая версия вставки данных кажется достаточно быстрой, чтобы на практике у меня не возникало проблем.

При этом я могу использовать простой SQL-запрос для подсчета всех объектов, соответствующих пакетному заданию. В итоге на все объекты уходит примерно 3 секунды, что по сравнению с альтернативой является приемлемой для меня производительностью, учитывая, что это только для внутреннего использования.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...