Я вижу, как что-то, что объединяет результаты, может быть полезным. Но это требует индивидуального решения. Есть много способов решить эту проблему - я пытался использовать Sync.WaitGroup
, но это было грязно. Похоже, использование sync.Mutex
для блокировки функции дозирования - лучший способ. Но, когда мьютекс - самый хороший ответ imo, это должно вызвать перепроверку дизайна, потому что, опять же, imo, это должен быть последний вариант.
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
func main() {
ctx, canc := context.WithCancel(context.Background())
acc := NewAccumulator(4, ctx)
go func() {
for i := 0; i < 10; i++ {
acc.Write("hi")
}
canc()
}()
read := acc.ReadChan()
for batch := range read {
fmt.Println(batch)
}
fmt.Println("done")
}
type Accumulator struct {
count int64
size int
in chan string
out chan []string
ctx context.Context
doneFlag int64
mu sync.Mutex
}
func NewAccumulator(size int, parentCtx context.Context) *Accumulator {
a := &Accumulator{
size: size,
in: make(chan string, size),
out: make(chan []string, 1),
ctx: parentCtx,
}
go func() {
<-a.ctx.Done()
atomic.AddInt64(&a.doneFlag, 1)
close(a.in)
a.mu.Lock()
a.batch()
a.mu.Unlock()
close(a.out)
}()
return a
}
func (a *Accumulator) Write(s string) {
if atomic.LoadInt64(&a.doneFlag) > 0 {
panic("write to closed accumulator")
}
a.in <- s
atomic.AddInt64(&a.count, 1)
a.mu.Lock()
if atomic.LoadInt64(&a.count) == int64(a.size) {
a.batch()
}
a.mu.Unlock()
}
func (a *Accumulator) batch() {
batch := make([]string, 0)
for i := 0; i < a.size; i++ {
msg := <-a.in
if msg != "" {
batch = append(batch, msg)
}
}
fmt.Println("batching", batch)
a.out <- batch
atomic.StoreInt64(&a.count, 0)
}
func (a *Accumulator) ReadChan() <-chan []string {
return a.out
}
Вероятно, лучше всего иметь срез, который накапливает строки, и когда этот срез достигает некоторого размера, вы начинаете некоторую обработку.