Если блоки должны быть записаны в порядке
Если вы хотите работать с несколькими блоками одновременно, очевидно, вам нужно одновременно хранить несколько блоков в памяти.
Выможет решить, сколько блоков вы хотите обработать одновременно, и этого достаточно для одновременного считывания в память.Например, вы можете сказать, что хотите обрабатывать 5 блоков одновременно.Это ограничит использование памяти и все еще будет максимально использовать ресурсы вашего ЦП.Рекомендуется выбирать число в зависимости от доступных ядер ЦП (если при обработке блока еще не используется многоядерный процессор).Это можно запросить, используя runtime.GOMAXPROCS(0)
.
. У вас должна быть одна программа, которая последовательно читает входной файл, и выдает блоки, завернутые в Jobs (которые также содержат индекс блока).
У вас должно быть несколько рабочих процедур, предпочтительно столько же, сколько у вас ядер (но также экспериментируйте с меньшими и более высокими значениями).Каждая рабочая группа просто получает задания и вызывает elaborateBlock()
данных и доставляет их в канал результатов.
Должен быть один назначенный потребитель, который получает выполненные задания и записывает их в порядкевыходной файл.Поскольку подпрограммы выполняются одновременно, и у нас нет никакого контроля, в каком порядке завершаются блоки, потребитель должен отслеживать индекс следующего блока, который будет записан в вывод.Блоки, поступающие не по порядку, должны храниться только и приступать к записи только при поступлении следующего блока.
Это (неполный) пример того, как сделать все это:
const BlockSize = 1 << 20 // 1 MB
func elaborateBlock(in []byte) []byte { return in }
type Job struct {
Index int
Block []byte
}
func producer(jobsCh chan<- *Job) {
// Init input file:
var inputFile *os.File
for index := 0; ; index++ {
job := &Job{
Index: index,
Block: make([]byte, BlockSize),
}
_, err := inputFile.Read(job.Block)
if err != nil {
break
}
jobsCh <- job
}
}
func worker(jobsCh <-chan *Job, resultCh chan<- *Job) {
for job := range jobsCh {
job.Block = elaborateBlock(job.Block)
resultCh <- job
}
}
func consumer(resultCh <-chan *Job) {
// Init output file:
var outputFile *os.File
nextIdx := 0
jobMap := map[int]*Job{}
for job := range resultCh {
jobMap[job.Index] = job
// Write out all blocks we have in contiguous index range:
for {
j := jobMap[nextIdx]
if j == nil {
break
}
if _, err := outputFile.Write(j.Block); err != nil {
// handle error, maybe terminate?
}
delete(nextIdx) // This job is written out
nextIdx++
}
}
}
func main() {
jobsCh := make(chan *Job)
resultCh := make(chan *Job)
for i := 0; i < 5; i++ {
go worker(jobsCh, resultCh)
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
consumer(resultCh)
}()
// Start producing jobs:
producer(jobsCh)
// No more jobs:
close(jobsCh)
// Wait for consumer to complete:
wg.Wait()
}
OneОбратите внимание: это само по себе не гарантирует ограничение используемой памяти.Представьте себе случай, когда первый блок потребует огромных затрат времени, а последующие - нет.Что случилось бы?Первый блок занимал бы рабочего, а другие работники «быстро» заканчивали последующие блоки.Потребитель будет хранить все в памяти, ожидая завершения первого блока (так как он должен быть записан первым).Это может увеличить использование памяти.
Как можно этого избежать?
Путем введения пула заданий.Новые рабочие места не могут быть созданы произвольно, а взяты из пула.Если пул пуст, производитель должен ждать.Поэтому, когда производителю нужен новый Job
, он берет его из пула.Когда потребитель выписал Job
, поместил его обратно в пул.Просто как тот.Это также уменьшит нагрузку на сборщик мусора, поскольку задания (и большие буферы []byte
) не создаются и не выбрасываются, их можно использовать повторно.
Для простой реализации пула Job
вы могли быиспользовать буферизованный канал.Подробнее см. Как реализовать пул памяти в Golang .
Если блоки могут быть записаны в любом порядке
Другим вариантом может быть предварительное выделение выходного файла.Если размер выходных блоков также является детерминированным, вы можете сделать это (например, outsize := (insize / blocksize) * outblockSize
).
С какой целью?
Если у вас предварительно выделен выходной файл, потребительне нужно ждать входных блоков по порядку.Как только блок ввода рассчитан, вы можете вычислить позицию, в которой он будет находиться на выходе, искать эту позицию и просто записать ее.Для этого вы можете использовать File.Seek()
.
Это решение все еще требует отправки индекса блока от производителя к потребителю, но потребителю не нужно хранить блоки, поступающие изпорядка, поэтому потребитель может быть проще и ему не нужно хранить завершенные блоки до тех пор, пока не прибудет следующий, чтобы продолжить запись выходного файла.
Обратите внимание, что это решение, естественно, не создает памятьУгроза, поскольку выполненные задания никогда не накапливаются / кэшируются, они записываются в порядке их завершения.
См. связанные вопросы для получения дополнительной информации и методов:
Является ли этоидиоматический пул рабочих потоков в Go?
Как собрать значения из N процедур, выполненных в определенном порядке?