время от времени (500+ мс) время записи или сброса в golang - PullRequest
0 голосов
/ 20 ноября 2018

У меня есть проект, в котором мы читаем данные из какого-то источника, обрабатываем их, затем записываем некоторое подмножество и, возможно, сжатую версию этих данных на диск.Мы записываем во многие (обычно около 200) разные файлы, чтобы соответствовать разным «каналам» данных, и часто записываем несколько МБ / с на диск между этими файлами.

Я вижу это примерно один раз каждыеминута или около того, вызов Write или Flush занимает около 500 мс.Я бы по возможности уменьшил это число до 50 мс.

Я попытался создать минимальный рабочий пример (MWE) с этим странным поведением, но я не могу воспроизвести длинные вызовы Write и Flush на машине, которая имеет такое поведение, в нашем полном проекте.Однако на моем ноутбуке MacBook Pro я могу вызвать 50 мс Flush звонков, что в 100 раз больше, чем обычно.Попытка MWE приведена ниже, и она воспроизводит так, как мы пишем данные почти точно.Он имеет несколько флагов для сброса в различных частях кода, я проверял, помог ли вызов Flush в определенных местах.Любое руководство о том, как получить согласованное время для звонков Flush и Write, было бы очень полезно.Даже если среднее время записи было медленнее, я просто хочу последовательного поведения.

package main

import (
    "bufio"
    "fmt"
    "io/ioutil"
    "os"
    "os/signal"
    "sync"
    "time"
)

const flushWithinBlock = true
const flushAfterBlocks = true

type Writer struct {
    FileName      string
    headerWritten bool
    writer        *bufio.Writer
}

func (w *Writer) writeHeader() error {
    file, err := os.Create(w.FileName)
    if err != nil {
        return err
    }
    w.writer = bufio.NewWriterSize(file, 32768)
    w.writer.WriteString("HEADER\n")
    w.headerWritten = true
    return nil
}

func (w *Writer) writeRecord(nBytes int) error {
    data := make([]byte, nBytes)
    nWritten, err := w.writer.Write(data)
    if nWritten != nBytes {
        return fmt.Errorf("wrong number of bytes written")
    }
    return err
}

func main() {
    dirname, err0 := ioutil.TempDir("", "")
    if err0 != nil {
        panic(err0)
    }
    fmt.Println(dirname)
    recordLength := 500
    numberOfChannels := 240
    recordsPerChanPerTick := 5
    writers := make([]*Writer, numberOfChannels)
    abortChan := make(chan struct{})
    for i := range writers {
        writers[i] = &Writer{FileName: fmt.Sprintf("%v/%v.ljh", dirname, i)}
    }
    go func() {
        signalChan := make(chan os.Signal)
        signal.Notify(signalChan, os.Interrupt)
        <-signalChan
        close(abortChan)
    }()

    tickDuration := 50 * time.Millisecond
    ticker := time.NewTicker(tickDuration)
    z := 0
    tLast := time.Now()
    fmt.Printf("recordsPerChanPerTick %v, Chans %v, tickDuration %v\n", recordsPerChanPerTick, numberOfChannels, tickDuration)
    fmt.Printf("records/second/chan %v, records/second total %v\n", float64(recordsPerChanPerTick)/tickDuration.Seconds(), float64(recordsPerChanPerTick*numberOfChannels)/tickDuration.Seconds())
    fmt.Printf("megabytes/second total %v\n", float64(recordLength)*float64(recordsPerChanPerTick*numberOfChannels)/tickDuration.Seconds()*1e-6)
    fmt.Printf("flushWithinBlock %v, flushAfterBlocks %v\n", flushWithinBlock, flushAfterBlocks)
    for {
        // 1. here we would get data from data source
        z++
        select {
        case <-abortChan:
            fmt.Println("clean exit")
            return
        case <-ticker.C:
            var wg sync.WaitGroup
            writeDurations := make([]time.Duration, numberOfChannels)
            flushDurations := make([]time.Duration, numberOfChannels)
            for i, w := range writers {
                wg.Add(1)
                go func(w *Writer, i int) {
                    tStart := time.Now()

                    defer wg.Done()
                    for j := 0; j < recordsPerChanPerTick; j++ {
                        if !w.headerWritten {
                            err := w.writeHeader()
                            if err != nil {
                                panic(fmt.Sprintf("failed create file and write header: %v\n", err))
                            }
                        }
                        w.writeRecord(recordLength)
                    }
                    tWrite := time.Now()
                    if flushWithinBlock {
                        w.writer.Flush()
                    }
                    writeDurations[i] = tWrite.Sub(tStart)
                    flushDurations[i] = time.Now().Sub(tWrite)
                }(w, i)
            }
            wg.Wait()
            for _, w := range writers {
                if flushAfterBlocks {
                    w.writer.Flush()
                }
            }
            var writeSum time.Duration
            var flushSum time.Duration
            var writeMax time.Duration
            var flushMax time.Duration
            for i := range writeDurations {
                writeSum += writeDurations[i]
                flushSum += flushDurations[i]
                if writeDurations[i] > writeMax {
                    writeMax = writeDurations[i]
                }
                if flushDurations[i] > flushMax {
                    flushMax = flushDurations[i]
                }
            }
            if z%100 == 0 || time.Now().Sub(tLast) > 75*time.Millisecond {
                fmt.Printf("z %v, time.Now().Sub(tLast) %v\n", z, time.Now().Sub(tLast))
                fmt.Printf("writeMean %v, flushMean %v, writeMax %v, flushMax %v\n", writeSum/time.Duration(numberOfChannels), flushSum/time.Duration(numberOfChannels), writeMax, flushMax)
            }
            tLast = time.Now()
        }
    }

}

Пример вывода на машине Ubuntu 16 с вращающимся жестким диском, это фактическое оборудование, которое имеет 500 мс Write и Flush вызовов в нашем полном проекте:

/tmp/296105809
recordsPerChanPerTick 5, Chans 240, tickDuration 50ms
records/second/chan 100, records/second total 24000
megabytes/second total 12
flushWithinBlock true, flushAfterBlocks true
z 1, time.Now().Sub(tLast) 75.96973ms
writeMean 14.017745ms, flushMean 7.847µs, writeMax 24.761147ms, flushMax 420.896µs
z 100, time.Now().Sub(tLast) 50.13856ms
writeMean 1.71µs, flushMean 4.213µs, writeMax 12.271µs, flushMax 32.133µs
z 200, time.Now().Sub(tLast) 50.006063ms
writeMean 1.651µs, flushMean 3.032µs, writeMax 79.006µs, flushMax 7.246µs
z 300, time.Now().Sub(tLast) 50.151421ms
writeMean 1.685µs, flushMean 4.542µs, writeMax 10.429µs, flushMax 14.087µs
z 400, time.Now().Sub(tLast) 50.059208ms

Пример вывода на MacBook Pro с SSD.Здесь вы можете увидеть гораздо более длинные вызовы Write и Flush, но в диапазоне 500 мс ничего нет.Обратите внимание на 30 мс flushMax в строке 81 против более типичных 500 us flushMax в строке 100.

/var/folders/_0/25kp6h7x25v6vyjv2yjlcnkm000wrm/T/934618054
recordsPerChanPerTick 5, Chans 240, tickDuration 50ms
records/second/chan 100, records/second total 24000
megabytes/second total 12
flushWithinBlock true, flushAfterBlocks true
z 1, time.Now().Sub(tLast) 84.897446ms
writeMean 10.265068ms, flushMean 464.53µs, writeMax 24.752873ms, flushMax 3.528286ms
... some output removed
... NOTE, line 81 was printed because it took longer than normal
z 81, time.Now().Sub(tLast) 75.804358ms
writeMean 15.056µs, flushMean 18.324892ms, writeMax 408.406µs, flushMax 30.765425ms
z 100, time.Now().Sub(tLast) 54.753448ms
writeMean 3.25µs, flushMean 84.963µs, writeMax 74.152µs, flushMax 499.322µs
...