GCP PubSub - Как поставить в очередь асинхронное сообщение? - PullRequest
0 голосов
/ 28 марта 2019

Я хотел бы получить информацию о настройке издателя в среде pubsub gcp.Я хотел бы поставить в очередь сообщения, которые будут потребляться через функцию Google.Для этого публикация будет запускаться при достижении определенного количества сообщений или за определенное время.

Я задаю тему следующим образом:

topic.PublishSettings = pubsub.PublishSettings{
        ByteThreshold:  1e6, // Publish a batch when its size in bytes reaches this value. (1e6 = 1Mo)
        CountThreshold: 100, // Publish a batch when it has this many messages.
        DelayThreshold: 10 * time.Second, // Publish a non-empty batch after this delay has passed.
    }

Когда я вызываю функцию публикации,У меня задержка 10 секунд на каждый звонок.Сообщения не добавляются в очередь ...

for _, v := range list {
    ctx := context.Background()
    res := a.Topic.Publish(ctx, &pubsub.Message{Data: v})

    // Block until the result is returned and a server-generated
    // ID is returned for the published message.
    serverID, err = res.Get(ctx)
    if err != nil {
        return "", err
    }
}

Кто-нибудь может мне помочь?

Приветствия

1 Ответ

1 голос
/ 29 марта 2019

Пакетная обработка на стороне издателя разработана для обеспечения большей эффективности затрат при отправке сообщений в Google Cloud Pub / Sub.Учитывая, что минимальная единица выставления счетов за услугу составляет 1 КБ, может быть дешевле отправлять несколько сообщений в одном запросе публикации.Например, отправка двух сообщений размером 0,5 КБ в виде отдельных запросов на публикацию приведет к изменению отправки 2 КБ данных (1 КБ для каждого).Если его объединить в один запрос на публикацию, он будет стоить 1 КБ данных.

Компромисс с пакетированием - задержка: чтобы заполнить пакеты, издатель должен ждать, чтобы получить больше сообщений.пакетировать вместе.Три свойства пакетной обработки (ByteThreshold, CountThreshold и DelayThreshold) позволяют контролировать уровень этого компромисса.Первые два свойства определяют, сколько данных или сколько сообщений мы помещаем в один пакет.Последнее свойство определяет, как долго издатель должен ждать отправки пакета.

В качестве примера представьте, что для параметра CountThreshold установлено значение 100. Если вы публикуете несколько сообщений, может потребоваться некоторое время, чтобы получить 100 сообщений для отправки в виде пакета.Это означает, что задержка для сообщений в этом пакете будет выше, поскольку они находятся на клиенте и ожидают отправки.Если для параметра DelayThreshold установлено значение 10 секунд, это означает, что пакет будет отправлен, если в нем содержится 100 сообщений или , если первое сообщение в пакете было получено не менее 10 секунд назад.Следовательно, это накладывает ограничение на величину задержки, которую необходимо ввести, чтобы иметь больше данных в отдельном пакете.

Код, который у вас есть, приведет к пакетам только с одним сообщением, которое каждоезаймет 10 секунд, чтобы опубликовать.Причиной является вызов res.Get(ctx), который будет блокироваться до тех пор, пока сообщение не будет успешно отправлено на сервер.Если для параметра CountThreshold установлено значение 100, а для параметра DelayThreshold установлено значение 10 секунд, последовательность, которая происходит внутри цикла:

  1. Вызов Publish помещает сообщение в пакет для публикации.
  2. Этот пакет ожидает получения еще 99 сообщений или в течение 10 секунд, прежде чем отправить пакет на сервер.
  3. Код ожидает отправки этого сообщения на сервер и возврата с serverID.
  4. Поскольку код не вызывает Publish снова, пока не вернется res.Get(ctx), он ожидает 10 секунд для отправки пакета.
  5. res.Get(ctx) возвращается с serverID дляодиночное сообщение.
  6. Вернитесь к 1.

Если вы действительно хотите пакетировать сообщения вместе, вы не можете позвонить res.Get(ctx) до следующего Publish вызова.Вы можете либо вызвать публикацию в goroutine (по одной подпрограмме на сообщение), либо скопировать объекты res в списке, а затем вызвать Get для них вне цикла, например:

    var res []*PublishResult
    ctx := context.Background()
    for _, v := range list {
        res = append(res, a.Topic.Publish(ctx, &pubsub.Message{Data: v}))
    }
    for _, r := range res  {
        serverID, err = r.Get(ctx)
        if err != nil {
            return "", err
        }
    }

Следует иметь в виду, что пакетная обработка оптимизирует затраты на стороне публикации, а не на стороне подписки.Облачные функции построены с push-подписками .Это означает, что сообщения должны доставляться абоненту по одному за раз (поскольку код ответа используется для подтверждения каждого сообщения), что означает, что пакетные сообщения, доставляемые абоненту, отсутствуют.

...