У меня есть проект, в котором мы читаем данные из какого-то источника, обрабатываем их, затем записываем некоторое подмножество и, возможно, сжатую версию этих данных на диск.Мы записываем во многие (обычно около 200) разные файлы, чтобы соответствовать разным «каналам» данных, и часто записываем несколько МБ / с на диск между этими файлами.
Я вижу это примерно один раз каждыеминута или около того, вызов Write
или Flush
занимает около 500 мс.Я бы по возможности уменьшил это число до 50 мс.
Я попытался создать минимальный рабочий пример (MWE) с этим странным поведением, но я не могу воспроизвести длинные вызовы Write
и Flush
на машине, которая имеет такое поведение, в нашем полном проекте.Однако на моем ноутбуке MacBook Pro я могу вызвать 50 мс Flush
звонков, что в 100 раз больше, чем обычно.Попытка MWE приведена ниже, и она воспроизводит так, как мы пишем данные почти точно.Он имеет несколько флагов для сброса в различных частях кода, я проверял, помог ли вызов Flush
в определенных местах.Любое руководство о том, как получить согласованное время для звонков Flush
и Write
, было бы очень полезно.Даже если среднее время записи было медленнее, я просто хочу последовательного поведения.
package main
import (
"bufio"
"fmt"
"io/ioutil"
"os"
"os/signal"
"sync"
"time"
)
const flushWithinBlock = true
const flushAfterBlocks = true
type Writer struct {
FileName string
headerWritten bool
writer *bufio.Writer
}
func (w *Writer) writeHeader() error {
file, err := os.Create(w.FileName)
if err != nil {
return err
}
w.writer = bufio.NewWriterSize(file, 32768)
w.writer.WriteString("HEADER\n")
w.headerWritten = true
return nil
}
func (w *Writer) writeRecord(nBytes int) error {
data := make([]byte, nBytes)
nWritten, err := w.writer.Write(data)
if nWritten != nBytes {
return fmt.Errorf("wrong number of bytes written")
}
return err
}
func main() {
dirname, err0 := ioutil.TempDir("", "")
if err0 != nil {
panic(err0)
}
fmt.Println(dirname)
recordLength := 500
numberOfChannels := 240
recordsPerChanPerTick := 5
writers := make([]*Writer, numberOfChannels)
abortChan := make(chan struct{})
for i := range writers {
writers[i] = &Writer{FileName: fmt.Sprintf("%v/%v.ljh", dirname, i)}
}
go func() {
signalChan := make(chan os.Signal)
signal.Notify(signalChan, os.Interrupt)
<-signalChan
close(abortChan)
}()
tickDuration := 50 * time.Millisecond
ticker := time.NewTicker(tickDuration)
z := 0
tLast := time.Now()
fmt.Printf("recordsPerChanPerTick %v, Chans %v, tickDuration %v\n", recordsPerChanPerTick, numberOfChannels, tickDuration)
fmt.Printf("records/second/chan %v, records/second total %v\n", float64(recordsPerChanPerTick)/tickDuration.Seconds(), float64(recordsPerChanPerTick*numberOfChannels)/tickDuration.Seconds())
fmt.Printf("megabytes/second total %v\n", float64(recordLength)*float64(recordsPerChanPerTick*numberOfChannels)/tickDuration.Seconds()*1e-6)
fmt.Printf("flushWithinBlock %v, flushAfterBlocks %v\n", flushWithinBlock, flushAfterBlocks)
for {
// 1. here we would get data from data source
z++
select {
case <-abortChan:
fmt.Println("clean exit")
return
case <-ticker.C:
var wg sync.WaitGroup
writeDurations := make([]time.Duration, numberOfChannels)
flushDurations := make([]time.Duration, numberOfChannels)
for i, w := range writers {
wg.Add(1)
go func(w *Writer, i int) {
tStart := time.Now()
defer wg.Done()
for j := 0; j < recordsPerChanPerTick; j++ {
if !w.headerWritten {
err := w.writeHeader()
if err != nil {
panic(fmt.Sprintf("failed create file and write header: %v\n", err))
}
}
w.writeRecord(recordLength)
}
tWrite := time.Now()
if flushWithinBlock {
w.writer.Flush()
}
writeDurations[i] = tWrite.Sub(tStart)
flushDurations[i] = time.Now().Sub(tWrite)
}(w, i)
}
wg.Wait()
for _, w := range writers {
if flushAfterBlocks {
w.writer.Flush()
}
}
var writeSum time.Duration
var flushSum time.Duration
var writeMax time.Duration
var flushMax time.Duration
for i := range writeDurations {
writeSum += writeDurations[i]
flushSum += flushDurations[i]
if writeDurations[i] > writeMax {
writeMax = writeDurations[i]
}
if flushDurations[i] > flushMax {
flushMax = flushDurations[i]
}
}
if z%100 == 0 || time.Now().Sub(tLast) > 75*time.Millisecond {
fmt.Printf("z %v, time.Now().Sub(tLast) %v\n", z, time.Now().Sub(tLast))
fmt.Printf("writeMean %v, flushMean %v, writeMax %v, flushMax %v\n", writeSum/time.Duration(numberOfChannels), flushSum/time.Duration(numberOfChannels), writeMax, flushMax)
}
tLast = time.Now()
}
}
}
Пример вывода на машине Ubuntu 16 с вращающимся жестким диском, это фактическое оборудование, которое имеет 500 мс Write
и Flush
вызовов в нашем полном проекте:
/tmp/296105809
recordsPerChanPerTick 5, Chans 240, tickDuration 50ms
records/second/chan 100, records/second total 24000
megabytes/second total 12
flushWithinBlock true, flushAfterBlocks true
z 1, time.Now().Sub(tLast) 75.96973ms
writeMean 14.017745ms, flushMean 7.847µs, writeMax 24.761147ms, flushMax 420.896µs
z 100, time.Now().Sub(tLast) 50.13856ms
writeMean 1.71µs, flushMean 4.213µs, writeMax 12.271µs, flushMax 32.133µs
z 200, time.Now().Sub(tLast) 50.006063ms
writeMean 1.651µs, flushMean 3.032µs, writeMax 79.006µs, flushMax 7.246µs
z 300, time.Now().Sub(tLast) 50.151421ms
writeMean 1.685µs, flushMean 4.542µs, writeMax 10.429µs, flushMax 14.087µs
z 400, time.Now().Sub(tLast) 50.059208ms
Пример вывода на MacBook Pro с SSD.Здесь вы можете увидеть гораздо более длинные вызовы Write
и Flush
, но в диапазоне 500 мс ничего нет.Обратите внимание на 30 мс flushMax
в строке 81 против более типичных 500 us flushMax
в строке 100.
/var/folders/_0/25kp6h7x25v6vyjv2yjlcnkm000wrm/T/934618054
recordsPerChanPerTick 5, Chans 240, tickDuration 50ms
records/second/chan 100, records/second total 24000
megabytes/second total 12
flushWithinBlock true, flushAfterBlocks true
z 1, time.Now().Sub(tLast) 84.897446ms
writeMean 10.265068ms, flushMean 464.53µs, writeMax 24.752873ms, flushMax 3.528286ms
... some output removed
... NOTE, line 81 was printed because it took longer than normal
z 81, time.Now().Sub(tLast) 75.804358ms
writeMean 15.056µs, flushMean 18.324892ms, writeMax 408.406µs, flushMax 30.765425ms
z 100, time.Now().Sub(tLast) 54.753448ms
writeMean 3.25µs, flushMean 84.963µs, writeMax 74.152µs, flushMax 499.322µs