Моя программа использует итератор для прохождения по карте и порождает несколько рабочих потоков для обработки точек из итератора чтения, и это хорошо. Теперь я хотел бы записать вывод для каждой точки, и для этого я использую буфер памяти, чтобы обеспечить сбор данных из потоков в правильном порядке перед их записью в файл (через другой итератор для записи) :
public class MapMain
{
// Multiple threads used here, each thread starts in Run()
// requests and processes map points
public void Run()
{
// Get point from somewhere and process point
int pointIndex = ...
bufferWriter.StartPoint(pointIndex);
// Perform a number of computations.
// For simplicity, numberOfComputations = 1 in this example
bufferedWriter.BufferValue(pointIndex, value);
bufferWriter.EndPoint(pointIndex);
}
}
Моя попытка реализовать буфер:
public class BufferWriter
{
private const int BufferSize = 4;
private readonly IIterator iterator;
private readonly float?[] bufferArray;
private readonly bool[] bufferingCompleted;
private readonly SortedDictionary<long, int> pointIndexToBufferIndexMap;
private readonly object syncObject = new object();
private int bufferCount = 0;
private int endBufferCount = 0;
public BufferWriter(....)
{
iterator = ...
bufferArray = new float?[BufferSize];
bufferingCompleted = new bool[BufferSize];
pointIndexToBufferIndexMap = new SortedDictionary<long, int>();
}
public void StartPoint(long pointIndex)
{
lock (syncObject)
{
if (bufferCount == BufferSize)
{
Monitor.Wait(syncObject);
}
pointIndexToBufferIndexMap.Add(pointIndex, bufferCount);
bufferCount++;
}
}
public void BufferValue(long pointIndex, float value)
{
lock (syncObject)
{
int bufferIndex = pointIndexToBufferIndexMap[pointIndex];
bufferArray[bufferIndex] = value;
}
}
public void EndPoint(long pointIndex)
{
lock (syncObject)
{
int bufferIndex = pointIndexToBufferIndexMap[pointIndex];
bufferingCompleted[bufferIndex] = true;
endBufferCount++;
if (endBufferCount == BufferSize)
{
FlushBuffer();
Monitor.PulseAll(syncObject);
}
}
}
private void FlushBuffer()
{
// Iterate in order of points
foreach (long pointIndex in pointIndexToBufferIndexMap.Keys)
{
// Move iterator
iterator.MoveNext();
int bufferIndex = pointIndexToBufferIndexMap[pointIndex];
if (bufferArray[bufferIndex].HasValue)
{
iterator.Current = bufferArray[bufferIndex];
// Clear to null
bufferArray[bufferIndex] = null;
}
}
bufferCount = 0;
endBufferCount = 0;
pointIndexToBufferIndexMap.Clear();
}
}
Я ищу отзывы, чтобы исправить и исправить ошибки в моем коде и решить любые проблемы с производительностью:
[1] Короче говоря: у меня есть буфер фиксированного размера, который собирает данные из нескольких точек обработки потоков в несколько случайном порядке. Когда буфер полностью заполняется данными, он должен быть очищен. Но что, если я собрал очки от 0 до 9, а точка 8 отсутствовала? Мой буфер уже заполнен, и любая точка, пытающаяся использовать буфер, будет блокироваться до тех пор, пока не будет выполнена очистка, для которой требуется точка 8.
[2] Порядок значений в буфере не соответствует порядку точек карты, к которым относятся значения. Если бы это было так, тогда я думаю, что очистка будет проще (доступ к массиву быстрее, чем время поиска SortedDictionary?). Кроме того, это может позволить нам повторно использовать очищенные слоты для входящих данных (циклический буфер?)
Но я не могу придумать рабочую модель для достижения этой цели.
[3] Буфер ждет, пока он полностью не заполнится, прежде чем очищать. Во многих случаях поток вызывает EndPoint()
, а iterator.Current
ссылается на эту точку.
Возможно, имеет смысл немедленно «написать» (то есть вызвать «iterator.Current» и один раз перечислить) для этой точки, но как это можно сделать?
Для ясности, запись iterator
в BufferWriter имеет собственный уровень буфера для кэширования значений, вызванных в его свойстве Current
перед записью в вывод, но мне не о чем беспокоиться. *
Я чувствую, что все это нужно переписать с нуля!
Любая помощь приветствуется, спасибо.