Почему у меня есть «недоставленные сообщения», если мой приемник работает? - PullRequest
0 голосов
/ 29 мая 2019

stackdriver undelivered messages

Я перезапустил свою виртуальную машину Windows, но это не помогло.На следующий день я перезапустил мой main.go, и сразу после этого я увидел, что старые застрявшие сообщения начали приходить.

Мой Subscription Type - это Pull, мой Acknowledgement Deadline - максимум: 600 секунд.

Справочная информация. Я хочу использовать Pubsub в качестве балансировщика нагрузки в моей управляемой группе экземпляров Windows (мне нужен Windows API для этой задачи).Обработка сообщений требует значительных ресурсов процессора (с несколькими HTTP-вызовами) и может занять от нескольких секунд до нескольких минут.

Некоторые другие показатели из Stackdriver: stackdriver pubsub stackdriver oldest unacknowledged message stackdriver backlog size

Понятия не имею, что я могу проверить.День назад я провел высоконагруженное тестирование и, похоже, все было в порядке (Undelivered Messages было нулевым, как мы видим на первом скриншоте выше).Теперь загрузка моего процессора равна нулю, управляемая группа уменьшилась до одного экземпляра (это не в производственной среде).Я пытаюсь использовать Pubsub в первый раз.Код моего main(), который синтезирует аудио из фрагментов текста, кодирует в два формата и загружает в S3:

func main() {
    fmt.Printf("Current time: %v\n",
        time.Now().Format(timeFormat),
    )

    // https://godoc.org/cloud.google.com/go/pubsub#Subscription.Receive
    err := subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
        timeStart := time.Now()
        if timeStartFirstMessage == nil {
            timeStartFirstMessage = &timeStart
        }

        var data pubsubMessage
        json.Unmarshal(m.Data, &data)
        fmt.Printf("Got message: %#v\n", data)

        var wg sync.WaitGroup
        wg.Add(len(data.TextChunks))

        wavs := make([][]byte, len(data.TextChunks))
        for i, chunk := range data.TextChunks {
            /** TODO without 'go' (sequential) will process in correct order:
             * user can listen first and seconds chunks faster,
             * but m4a will be later,
             * but on "high load" of the VM with ~2x more messages in parallel
             * (each message on it is own goroutine)
             * performance may be even better (I think) because
             * hundreds of CPU intensive goroutine is worse.
             *
             * Also in sequential code I can append() to wav[]
             * instead of doind it later in another func,
             * maybe this will also improve perfomance (profile).
            **/
            go func(i int, chunk string) {
                fmt.Printf("Index of text chunk: %d\n", i)
                wav := tts(data.Voice, chunk)
                streamOfOggEncoder := encodeOggVorbis(wav)
                uploadToS3(
                    "intelligentspeaker",
                    data.Sub+"/"+data.HashArticle+"/"+fmt.Sprint(i),
                    "audio/ogg",
                    streamOfOggEncoder,
                )
                wavs[i] = wav

                wg.Done()
            }(i, chunk)
        }
        wg.Wait()
        wavConcated := concat(wavs)

        filename := buildPodcastEpisodeFilename(data.Title)
        err := encodePodcastEpisode(wavConcated, filename)
        if err != nil {
            m.Nack()
            return
        }

        if err != nil {
            logger.Log(logging.Entry{Payload: fmt.Sprintf("ERROR on m4a deleting: %v", err)})
        }

        key := data.Sub + "/" + data.HashArticle + "/" + random() + "/" + filename
        readCloser, size := getReadCloserAndSize(filename)
        if readCloser == nil {
            m.Nack()
            return
        }
        uploadToS3("intelligentspeaker--podcasts", key, "audio/x-m4a", readCloser)
        // Next message may be with the same title (filename)
        err = os.Remove(filename)

        fmt.Printf("Duration: %v\n", duration(wavConcated))
        updatePodcastXML(
            key,
            data.Title,
            data.URLArticle,
            data.FavIconURL,
            duration(wavConcated),
            data.Utc,
            size,
        )

        fmt.Printf("DONE pubsub message, current time: %v\n", time.Now().Format(timeFormat))
        fmt.Printf("Time of message processing: %v\n", time.Since(timeStart).Round(time.Second))
        fmt.Printf(
            "Time of all messages processing (counted from the first message, not from the start of this app): %v\n",
            time.Since(*timeStartFirstMessage).Round(time.Second),
        )

        m.Ack()

    })
    if err != nil {
        logger.Log(logging.Entry{Payload: fmt.Sprintf("ERROR on registering receiver: %v", err)})
    }

}

Обновление: найдено аналогичный вопрос .

1 Ответ

0 голосов
/ 30 мая 2019

Я предполагаю, что соответствующий показатель - pubsub.googleapis.com/subscription/num_undelivered_messages, который измеряет «Количество неподтвержденных сообщений (a.k.a. backlog messages) в подписке». Из графика «Подтверждение запросов» мы видим, что ваши подписчики перестали подтверждать сообщение около 21:00 28-го числа. В это же время возраст «самого старого неподтвержденного сообщения» начал расти (линейно). В то же время график "Backlog Size" довольно плоский. Это означает, что в резерве подписки имеется небольшое количество «проблемных» сообщений, которые подписчики не подтверждают. Скорее всего, по какой-то причине у подписчиков возникают проблемы с обработкой этого небольшого набора сообщений. Возможно, эти сообщения каким-то образом искажены или не соответствуют ожиданиям абонентского кода.

Одна вещь, которую вы можете сделать, чтобы попытаться отладить это, это использовать инструмент командной строки gcloud, чтобы «заглянуть» в сообщения в списке ожидания подписки: https://cloud.google.com/sdk/gcloud/reference/pubsub/subscriptions/pull. Обратите внимание, что gcloud tool не будет подтверждать какие-либо сообщения, если вы не установите флаг --auto-ack.

...