Синхронизировать запись в файл из тяжелых операций в разных потоках - PullRequest
0 голосов
/ 08 июня 2018

Мне нужно разработать файл (потенциально большой файл) по одному блоку за раз и записать результат в новый файл.Проще говоря, у меня есть основная функция для разработки блока:

func elaborateBlock(block []byte) []byte { ... }

Каждый блок должен быть обработан и затем последовательно записан в выходной файл (с сохранением исходного порядка).

Однопоточная реализация тривиальна:

for {
        buffer := make([]byte, BlockSize)
        _, err := inputFile.Read(buffer)

        if err == io.EOF {
            break
        }
        processedData := elaborateBlock(buffer)
        outputFile.Write(processedData)
}

Но разработка может быть тяжелой, и каждый блок может обрабатываться отдельно, поэтому многопоточная реализация является естественной эволюцией.

Решение, которое я придумал, состоит в том, чтобы создать массив каналов, вычислить каждый блок в отдельном потоке и синхронизировать окончательную запись, зациклив массив каналов:

Служебная функция:

func blockThread(channel chan []byte, block []byte) {
    channel <- elaborateBlock(block)
}

В основной программе:

chans = []chan []byte {}

for {
    buffer := make([]byte, BlockSize)
    _, err := inputFile.Read(buffer)

    if err == io.EOF {
        break
    }

    channel := make(chan []byte)
    chans = append(chans, channel)

    go blockThread(channel, buffer)
}

for i := range chans {
    data := <- chans[i]
    outputFile.Write(data)
}

Этот подход работает, но может быть проблематичным для больших файлов, поскольку он требует загрузки всего файла в память перед началом записи вывода.

Как вы думаете, может ли быть лучшее решение с более высокой производительностью в целом?

Ответы [ 2 ]

0 голосов
/ 08 июня 2018

Если блоки должны быть записаны в порядке

Если вы хотите работать с несколькими блоками одновременно, очевидно, вам нужно одновременно хранить несколько блоков в памяти.

Выможет решить, сколько блоков вы хотите обработать одновременно, и этого достаточно для одновременного считывания в память.Например, вы можете сказать, что хотите обрабатывать 5 блоков одновременно.Это ограничит использование памяти и все еще будет максимально использовать ресурсы вашего ЦП.Рекомендуется выбирать число в зависимости от доступных ядер ЦП (если при обработке блока еще не используется многоядерный процессор).Это можно запросить, используя runtime.GOMAXPROCS(0).

. У вас должна быть одна программа, которая последовательно читает входной файл, и выдает блоки, завернутые в Jobs (которые также содержат индекс блока).

У вас должно быть несколько рабочих процедур, предпочтительно столько же, сколько у вас ядер (но также экспериментируйте с меньшими и более высокими значениями).Каждая рабочая группа просто получает задания и вызывает elaborateBlock() данных и доставляет их в канал результатов.

Должен быть один назначенный потребитель, который получает выполненные задания и записывает их в порядкевыходной файл.Поскольку подпрограммы выполняются одновременно, и у нас нет никакого контроля, в каком порядке завершаются блоки, потребитель должен отслеживать индекс следующего блока, который будет записан в вывод.Блоки, поступающие не по порядку, должны храниться только и приступать к записи только при поступлении следующего блока.

Это (неполный) пример того, как сделать все это:

const BlockSize = 1 << 20 // 1 MB

func elaborateBlock(in []byte) []byte { return in }

type Job struct {
    Index int
    Block []byte
}

func producer(jobsCh chan<- *Job) {
    // Init input file:
    var inputFile *os.File

    for index := 0; ; index++ {
        job := &Job{
            Index: index,
            Block: make([]byte, BlockSize),
        }

        _, err := inputFile.Read(job.Block)
        if err != nil {
            break
        }

        jobsCh <- job
    }
}

func worker(jobsCh <-chan *Job, resultCh chan<- *Job) {
    for job := range jobsCh {
        job.Block = elaborateBlock(job.Block)
        resultCh <- job
    }
}

func consumer(resultCh <-chan *Job) {
    // Init output file:
    var outputFile *os.File

    nextIdx := 0
    jobMap := map[int]*Job{}

    for job := range resultCh {
        jobMap[job.Index] = job

        // Write out all blocks we have in contiguous index range:
        for {
            j := jobMap[nextIdx]
            if j == nil {
                break
            }
            if _, err := outputFile.Write(j.Block); err != nil {
                // handle error, maybe terminate?
            }
            delete(nextIdx) // This job is written out
            nextIdx++
        }
    }
}

func main() {
    jobsCh := make(chan *Job)
    resultCh := make(chan *Job)

    for i := 0; i < 5; i++ {
        go worker(jobsCh, resultCh)
    }

    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        consumer(resultCh)
    }()

    // Start producing jobs:
    producer(jobsCh)
    // No more jobs:
    close(jobsCh)

    // Wait for consumer to complete:
    wg.Wait()
}

OneОбратите внимание: это само по себе не гарантирует ограничение используемой памяти.Представьте себе случай, когда первый блок потребует огромных затрат времени, а последующие - нет.Что случилось бы?Первый блок занимал бы рабочего, а другие работники «быстро» заканчивали последующие блоки.Потребитель будет хранить все в памяти, ожидая завершения первого блока (так как он должен быть записан первым).Это может увеличить использование памяти.

Как можно этого избежать?

Путем введения пула заданий.Новые рабочие места не могут быть созданы произвольно, а взяты из пула.Если пул пуст, производитель должен ждать.Поэтому, когда производителю нужен новый Job, он берет его из пула.Когда потребитель выписал Job, поместил его обратно в пул.Просто как тот.Это также уменьшит нагрузку на сборщик мусора, поскольку задания (и большие буферы []byte) не создаются и не выбрасываются, их можно использовать повторно.

Для простой реализации пула Job вы могли быиспользовать буферизованный канал.Подробнее см. Как реализовать пул памяти в Golang .

Если блоки могут быть записаны в любом порядке

Другим вариантом может быть предварительное выделение выходного файла.Если размер выходных блоков также является детерминированным, вы можете сделать это (например, outsize := (insize / blocksize) * outblockSize).

С какой целью?

Если у вас предварительно выделен выходной файл, потребительне нужно ждать входных блоков по порядку.Как только блок ввода рассчитан, вы можете вычислить позицию, в которой он будет находиться на выходе, искать эту позицию и просто записать ее.Для этого вы можете использовать File.Seek().

Это решение все еще требует отправки индекса блока от производителя к потребителю, но потребителю не нужно хранить блоки, поступающие изпорядка, поэтому потребитель может быть проще и ему не нужно хранить завершенные блоки до тех пор, пока не прибудет следующий, чтобы продолжить запись выходного файла.

Обратите внимание, что это решение, естественно, не создает памятьУгроза, поскольку выполненные задания никогда не накапливаются / кэшируются, они записываются в порядке их завершения.


См. связанные вопросы для получения дополнительной информации и методов:

Является ли этоидиоматический пул рабочих потоков в Go?

Как собрать значения из N процедур, выполненных в определенном порядке?

0 голосов
/ 08 июня 2018

Вот рабочий пример, который должен работать и максимально приближен к вашему исходному коду.

Идея состоит в том, чтобы превратить ваш массив в канал каналов байтов.затем

  • сначала запустит потребителя, который будет читать на этом канале каналов, получать канал байтов, читать с него и записывать результат.

  • Вернувшись в основной поток, вы создаете канал байтов, записываете его в канал каналов (теперь потребитель, считывающий последовательно с них, будет читать результаты по порядку), а затем запускаетсяпроцесс, который будет выполнять работу и писать на выделенном канале (продюсеры).

, что теперь произойдет, так это то, что между прокураторами и потребителем произойдет «гонка», как только произведенный блок будет считан у потребителя и записан ресурсы, связанные сэто будет освобождено.это может быть улучшением вашего оригинального дизайна.

вот код и ссылка на игровую площадку:

package main

import (
    "bytes"
    "fmt"
    "io"
    "sync"
)

func elaborateBlock(b []byte) []byte {
    return []byte("werkwerkwerk")
}

func blockThread(channel chan []byte, block []byte, wg *sync.WaitGroup) {
    channel <- elaborateBlock(block)
    wg.Done()
}

func main() {
    chans := make(chan chan []byte)
    BlockSize := 3
    inputBytes := bytes.NewBuffer([]byte("transmutemetowerkwerkwerk"))

    producewg := sync.WaitGroup{}
    consumewg := sync.WaitGroup{}
    consumewg.Add(1)
    go func() {
        chancount := 0
        for ch := range chans {
            data := <-ch
            fmt.Printf("got %d block, result:%s\n", chancount, data)
            chancount++
        }
        fmt.Printf("done receiving\n")
        consumewg.Done()
    }()
    for {
        buffer := make([]byte, BlockSize)
        _, err := inputBytes.Read(buffer)

        if err == io.EOF {
            go func() {
                //wait for all the procuders to finish
                producewg.Wait()
                //then close the main channel to notify the consumer
                close(chans)
            }()
            break
        }

        channel := make(chan []byte)
        chans <- channel //give the channel that we return the result to the receiver

        producewg.Add(1)
        go blockThread(channel, buffer, &producewg)
    }

    consumewg.Wait()
    fmt.Printf("main exiting")
}

игровая ссылка

какНезначительное замечание: я не чувствую себя правильно в операторе «чтение всего файла в память», потому что вы просто читаете блок каждый раз из Reader, может быть, «уместнее хранить результат всего вычисления в памяти»?

...