Реализация буфера для записи данных из нескольких потоков? - PullRequest
1 голос
/ 12 января 2011

Моя программа использует итератор для прохождения по карте и порождает несколько рабочих потоков для обработки точек из итератора чтения, и это хорошо. Теперь я хотел бы записать вывод для каждой точки, и для этого я использую буфер памяти, чтобы обеспечить сбор данных из потоков в правильном порядке перед их записью в файл (через другой итератор для записи) :

public class MapMain
{
    // Multiple threads used here, each thread starts in Run() 
    // requests and processes map points

    public void Run()
    {
        // Get point from somewhere and process point
        int pointIndex = ...

        bufferWriter.StartPoint(pointIndex);

        // Perform a number of computations.
        // For simplicity, numberOfComputations = 1 in this example   
        bufferedWriter.BufferValue(pointIndex, value);

        bufferWriter.EndPoint(pointIndex); 
    }
}

Моя попытка реализовать буфер:

public class BufferWriter
{
  private const int BufferSize = 4;

  private readonly IIterator iterator;
  private readonly float?[] bufferArray;
  private readonly bool[] bufferingCompleted;
  private readonly SortedDictionary<long, int> pointIndexToBufferIndexMap;
  private readonly object syncObject = new object();  

  private int bufferCount = 0;
  private int endBufferCount = 0;

  public BufferWriter(....)
  {
      iterator = ...
      bufferArray = new float?[BufferSize];
      bufferingCompleted = new bool[BufferSize];
      pointIndexToBufferIndexMap = new SortedDictionary<long, int>();
  }

  public void StartPoint(long pointIndex)
  {
    lock (syncObject)
    {
        if (bufferCount == BufferSize)
        {
            Monitor.Wait(syncObject);
        }

        pointIndexToBufferIndexMap.Add(pointIndex, bufferCount);   
        bufferCount++;
    }
  }

  public void BufferValue(long pointIndex, float value)
  {
      lock (syncObject)
      {
          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];
          bufferArray[bufferIndex] = value;          
      }
  }

  public void EndPoint(long pointIndex)
  {
      lock (syncObject)
      {
          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];
          bufferingCompleted[bufferIndex] = true;

          endBufferCount++;
          if (endBufferCount == BufferSize)
          {
              FlushBuffer();
              Monitor.PulseAll(syncObject);
          }
      }
  }

  private void FlushBuffer()
  {
      // Iterate in order of points
      foreach (long pointIndex in pointIndexToBufferIndexMap.Keys)
      {
          // Move iterator 
          iterator.MoveNext();

          int bufferIndex = pointIndexToBufferIndexMap[pointIndex];

          if (bufferArray[bufferIndex].HasValue)
          {                  
              iterator.Current = bufferArray[bufferIndex];

              // Clear to null
              bufferArray[bufferIndex] = null;                  
          }
      }

      bufferCount = 0;
      endBufferCount = 0;
      pointIndexToBufferIndexMap.Clear();
  }        
}

Я ищу отзывы, чтобы исправить и исправить ошибки в моем коде и решить любые проблемы с производительностью:

[1] Короче говоря: у меня есть буфер фиксированного размера, который собирает данные из нескольких точек обработки потоков в несколько случайном порядке. Когда буфер полностью заполняется данными, он должен быть очищен. Но что, если я собрал очки от 0 до 9, а точка 8 отсутствовала? Мой буфер уже заполнен, и любая точка, пытающаяся использовать буфер, будет блокироваться до тех пор, пока не будет выполнена очистка, для которой требуется точка 8.

[2] Порядок значений в буфере не соответствует порядку точек карты, к которым относятся значения. Если бы это было так, тогда я думаю, что очистка будет проще (доступ к массиву быстрее, чем время поиска SortedDictionary?). Кроме того, это может позволить нам повторно использовать очищенные слоты для входящих данных (циклический буфер?)

Но я не могу придумать рабочую модель для достижения этой цели.

[3] Буфер ждет, пока он полностью не заполнится, прежде чем очищать. Во многих случаях поток вызывает EndPoint(), а iterator.Current ссылается на эту точку. Возможно, имеет смысл немедленно «написать» (то есть вызвать «iterator.Current» и один раз перечислить) для этой точки, но как это можно сделать?

Для ясности, запись iterator в BufferWriter имеет собственный уровень буфера для кэширования значений, вызванных в его свойстве Current перед записью в вывод, но мне не о чем беспокоиться. *

Я чувствую, что все это нужно переписать с нуля!

Любая помощь приветствуется, спасибо.

Ответы [ 2 ]

1 голос
/ 12 января 2011

Это мое решение, которое должно работать, хотя я его не проверял. Добавить новое поле:

private readonly Queue<AutoResetEvent> waitHandles = new Queue<AutoResetEvent>();

Два if (начало и конец) требуют изменения на:

Начало:

if (bufferCount == BufferSize)
{
    AutoResetEvent ev = new AutoResetEvent( false );
    waitHandles.Enqueue( ev );
    ev.WaitOne();
}

Конец:

if (endBufferCount == BufferSize)
{
   FlushBuffer();
   for ( int i = 0; i < Math.Min( waitHandles.Count, BufferSize ); ++i )
   {
      waitHandles.Dequeue().Set();
   }
}
1 голос
/ 12 января 2011

Я бы не делал параллелизм "вручную", передавал его в TPL или PLINQ.Поскольку вы говорите о карте, у вас есть фиксированный набор точек, которые можно перечислить по координатам, и позвольте PLINQ беспокоиться о параллельности.

Пример:

// first get your map points, could be just a lazy iterator over every map point
IEnumerable<MapPoint> mapPoints = ...
//Now use PLINQ to compute in parallel, maintain order
var computedMapPoints = mapPoints.AsParallel()
                        .AsOrdered()
                        .Select(mappoint => ComputeMapPoint(mappoint)).ToList();
...