Как задержать прямой эфир? - PullRequest
2 голосов
/ 08 марта 2019

Я пытаюсь создать сервис на Go, который задерживает прямой эфир (socketio / signalR) на ~ 7 минут. Также следует разрешить поток без задержки. Таким образом, служба Go должна иметь что-то вроде буфера или очереди, которая заставляет данные ждать заданную продолжительность, прежде чем их разрешат использовать. Как бы вы сделали что-то подобное в Go? Будет ли задержанный поток отдельным разделом? Какую структуру данных следует использовать для задержки данных?

Моя текущая идея состоит в том, чтобы использовать пакет time для ожидания / отметки в течение 7 минут, прежде чем данные будут разрешены к использованию, но это поведение блокировки может быть неоптимальным в этом сценарии.

Вот код, объясняющий, что я пытаюсь сделать. FakeStream - это фиктивная функция, которая имитирует живые потоковые данные, которые я получаю от внешней службы.

package main

import (
    "fmt"
    "time"
)

func DelayStream(input chan string, output chan string, delay string) {

    // not working for some reason
    // delayDuration, _ := time.ParseDuration(delay)
    // fmt.Println(delayDuration.Seconds())

    if delay == "5s" {
        fmt.Println("sleeping")
        time.Sleep(5 * time.Second)
    }
    data := <-input
    output <- data
}

func FakeStream(live chan string) {

    ticks := time.Tick(2 * time.Second)
    for now := range ticks {
        live <- fmt.Sprintf("%v", now.Format(time.UnixDate))
    }
}

func main() {
    liveData := make(chan string)
    delayedData := make(chan string)

    go FakeStream(liveData)
    go DelayStream(liveData, delayedData, "5s")

    for {
        select {
        case live := <-liveData:
            fmt.Println("live: ", live)
        case delayed := <-delayedData:
            fmt.Println("delayed: ", delayed)
        }
    }
}

По какой-то причине канал с задержкой выводит только один раз и не выводит ожидаемые данные. Он должен выводить первое на прямом канале, но это не так.

1 Ответ

3 голосов
/ 08 марта 2019

Вам нужен буфер достаточного размера. Для простых случаев может работать буферизованный канал Go.

Спросите себя - сколько данных нужно хранить во время этой задержки - у вас должен быть разумный верхний предел. Например, если ваш поток доставляет до N пакетов в секунду, то для задержки на 7 минут вам нужно хранить 420N пакетов.

Спросите себя - что произойдет, если за окно задержки поступит больше данных, чем ожидалось? Вы можете выбросить новые данные, или выбросить старые данные, или просто заблокировать входной поток. Какие из них возможны для вашего сценария? Каждый из них приводит к немного другому решению.

Спросите себя - как рассчитывается задержка? С момента создания потока? С момента поступления каждого пакета? Задержка для каждого пакета отдельно или только для первого пакета в потоке?

Вам нужно будет значительно сузить выбор дизайна здесь, чтобы разработать пример кода.

Для некоторого подмножества этих вариантов дизайна вот простой способ добавить задержку между каналами для каждого сообщения:


package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // in is a channel of strings with a buffer size of 10
    in := make(chan string, 10)

    // out is an unbuffered channel
    out := make(chan string)

    // this goroutine forwards messages from in to out, ading a delay
    // to each message.
    const delay = 3 * time.Second
    go func() {
        for msg := range in {
            time.Sleep(delay)
            out <- msg
        }
        close(out)
    }()

    var wg sync.WaitGroup
    wg.Add(1)
    // this goroutine drains the out channel
    go func() {
        for msg := range out {
            fmt.Printf("Got '%s' at time %s\n", msg, time.Now().Format(time.Stamp))
        }
        wg.Done()
    }()

    // Send some messages into the in channel
    fmt.Printf("Sending '%s' at time %s\n", "joe", time.Now().Format(time.Stamp))
    in <- "joe"

    time.Sleep(2 * time.Second)
    fmt.Printf("Sending '%s' at time %s\n", "hello", time.Now().Format(time.Stamp))
    in <- "hello"

    time.Sleep(4 * time.Second)
    fmt.Printf("Sending '%s' at time %s\n", "bye", time.Now().Format(time.Stamp))
    in <- "bye"
    close(in)

    wg.Wait()
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...