Подождите N элементов в канале, прежде чем выполнять последовательно - PullRequest
0 голосов
/ 12 ноября 2018

Так что я очень новичок! Но у меня была идея о том, что я хотел попробовать.

Я хотел бы иметь процедуру go, которая принимает строки из канала, но только после того, как она получила N строк, должна выполняться на них.

Я искал похожие вопросы или случаи, но нашел только те, в которых идея заключалась в том, чтобы выполнить несколько подпрограмм параллельно и ждать, чтобы скомпилировать результат.

Я думал об идее создания массива и просто передать его в процедуру, где длина была достаточной. Однако я хочу сохранить определенное разделение интересов и контролировать это на принимающей стороне.

Мои вопросы.

  1. Это какая-то плохая практика по какой-то причине?
  2. Есть ли лучший способ сделать это, что это?

    func main() {
        ch := make(chan string)
        go func() {
            tasks := []string{}
            for {
                tasks = append(tasks,<- ch)
    
                if len(tasks) < 3 {
                    fmt.Println("Queue still to small")
                }
                if len(tasks) > 3 {
                    for i := 0; i < len(tasks); i++ {
                        fmt.Println(tasks[i])
                    }
                }
            }
        }()
    
        ch <- "Msg 1"
        time.Sleep(time.Second)
        ch <- "Msg 2"
        time.Sleep(time.Second)
        ch <- "Msg 3"
        time.Sleep(time.Second)
        ch <- "Msg 4"
        time.Sleep(time.Second)
    }
    

Правка для более простого и точного примера.

Ответы [ 2 ]

0 голосов
/ 12 ноября 2018

Я вижу, как что-то, что объединяет результаты, может быть полезным. Но это требует индивидуального решения. Есть много способов решить эту проблему - я пытался использовать Sync.WaitGroup, но это было грязно. Похоже, использование sync.Mutex для блокировки функции дозирования - лучший способ. Но, когда мьютекс - самый хороший ответ imo, это должно вызвать перепроверку дизайна, потому что, опять же, imo, это должен быть последний вариант.

package main

import (
    "context"
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {

    ctx, canc := context.WithCancel(context.Background())
    acc := NewAccumulator(4, ctx)
    go func() {
        for i := 0; i < 10; i++ {
            acc.Write("hi")
        }
        canc()
    }()

    read := acc.ReadChan()
    for batch := range read {
        fmt.Println(batch)
    }
    fmt.Println("done")
}

type Accumulator struct {
    count    int64
    size     int
    in       chan string
    out      chan []string
    ctx      context.Context
    doneFlag int64
    mu   sync.Mutex
}

func NewAccumulator(size int, parentCtx context.Context) *Accumulator {
    a := &Accumulator{
        size: size,
        in:   make(chan string, size),
        out:  make(chan []string, 1),
        ctx:  parentCtx,
    }

    go func() {
        <-a.ctx.Done()
        atomic.AddInt64(&a.doneFlag, 1)
        close(a.in)
        a.mu.Lock()
        a.batch()
        a.mu.Unlock()
        close(a.out)
    }()
    return a
}

func (a *Accumulator) Write(s string) {
    if atomic.LoadInt64(&a.doneFlag) > 0 {
        panic("write to closed accumulator")
    }
    a.in <- s
    atomic.AddInt64(&a.count, 1)
    a.mu.Lock()
    if atomic.LoadInt64(&a.count) == int64(a.size) {
        a.batch()
    }
    a.mu.Unlock()
}

func (a *Accumulator) batch() {
    batch := make([]string, 0)
    for i := 0; i < a.size; i++ {
        msg := <-a.in
        if msg != "" {
            batch = append(batch, msg)
        }
    }
    fmt.Println("batching", batch)
    a.out <- batch
    atomic.StoreInt64(&a.count, 0)
}

func (a *Accumulator) ReadChan() <-chan []string {
    return a.out
}

Вероятно, лучше всего иметь срез, который накапливает строки, и когда этот срез достигает некоторого размера, вы начинаете некоторую обработку.

0 голосов
/ 12 ноября 2018

Исходя из нескольких комментариев, похоже, что вы ищете какую-то форму пакетирования.

Пакетирование имеет несколько сценариев, когда вы хотите взять пакет и отправить его вместе:

  1. Размер пакета достаточного размера
  2. Достаточно времени прошло, и частичная партия должна быть сброшена

В данном примере не учитывается второй сценарий.Это может привести к некоторому неловкому поведению, если вы просто не выполняете сброс, потому что перестали получать нагрузку.

Поэтому я бы рекомендовал либо заглянуть в библиотеку (например, cloudfoundry / go-batching ), либо простоиспользовать каналы, таймер и оператор выбора.

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan string)
    go func() {
        tasks := []string{}
        timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience
        for {
            select {
            case <-timer.C:
                fmt.Println("Flush partial batch due to time")
                flush(tasks)
                tasks = nil
                timer.Reset(time.Second)
            case data := <-ch:
                tasks = append(tasks, data)

                // Reset the timer for each data point so that we only flush
                // partial batches when we stop receiving data.
                if !timer.Stop() {
                    <-timer.C
                }
                timer.Reset(time.Second)

                // Guard clause to for batch size
                if len(tasks) < 3 {
                    fmt.Println("Queue still too small")
                    continue
                }

                flush(tasks)
                tasks = nil // reset tasks
            }
        }
    }()

    ch <- "Msg 1"
    time.Sleep(time.Second)
    ch <- "Msg 2"
    time.Sleep(time.Second)
    ch <- "Msg 3"
    time.Sleep(time.Second)
    ch <- "Msg 4"
    time.Sleep(time.Second)
}

func flush(tasks []string) {
    // Guard against emtpy flushes
    if len(tasks) == 0 {
        return
    }

    fmt.Println("Flush")
    for _, t := range tasks {
        fmt.Println(t)
    }
}
...