Атомно изменить два числа без использования замков - PullRequest
5 голосов
/ 28 июня 2019

Я создаю структуру данных только для добавления, которая находится в памяти и добавляет записи, сериализованные в байтовые массивы, в эту память. Мне нужно, чтобы это было потокобезопасным и чрезвычайно быстрым, поэтому я придумал следующий код, который до сих пор прекрасно работает (это псевдокод, реальная версия более сложна и выполняет некоторые другие вещи, но только для того, чтобы понять)

public sealed class MemoryList : IDisposable
{
    private int nextOffset = 0;
    private readonly MemoryMappedFile file;
    private readonly MemoryMappedViewAccessor va;

    public MemoryList(uint capacity)
    {
        // Some checks on capacity here
        var mapName = Guid.NewGuid().ToString("N");
        this.file = MemoryMappedFile.CreateNew(mapName, capacity);
        this.va = file.CreateViewAccessor(0, capacity);
    }

    public void AppendMessage(byte[] messagePayload)
    {
        if (messagePayload == null) 
            throw new ArgumentNullException(nameof(messagePayload));
        if (messagePayload.Length == 0)
            throw new ArgumentOutOfRangeException(nameof(messagePayload));

        if (TryReserveCapacity(messagePayload.Length, out var offsetToWriteTo))
        {
            this.va.Write(offsetToWriteTo, messagePayload.Length);
            this.va.WriteArray(offsetToWriteTo + sizeof(int), messagePayload, 0, messagePayload.Length);
        }
    }

    private bool TryReserveCapacity(int dataLength, out long reservedOffset)
    {
        // reserve enough room to store data + its size
        var packetSize = sizeof(int) + dataLength;
        reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;

        if (this.nextOffset <= this.va.Capacity)
            return true;
        reservedOffset = -1;
        return false;
    }

    public void Dispose()
    {
        file?.Dispose();
        va?.Dispose();
    }
}

Это очень быстро и работает очень хорошо. Я не смог сломать его, как бы я ни старался.

Так что теперь мне нужно для каждого добавленного сообщения метод TryReserveCapacity для вывода логического индекса каждого сообщения. Итак, для первого сообщения получаем индекс 0, для второго - индекс 1 и т. Д. Это приводит к использованию двух вызовов Interlocked, одного для offset и одного для messageIndex, которые, по-видимому, не являются поточно-ориентированными, и я могу столкнуться с условиями гонки, приводящими к следующей ситуации.

MI: 101, смещение: 10000 MI: 100, смещение: 10500

Есть идеи, как гарантировать, что ни один MI не будет больше, чем другой MI с большим смещением? И все это без использования замков?

Так в принципе, как мы можем изменить следующий метод для правильного поведения?

private bool TryReserveCapacity(int dataLength, out long reservedOffset, out long messageId)
{
    // reserve enough room to store data + its size
    var packetSize = sizeof(int) + dataLength;
    reservedOffset = Interlocked.Add(ref this.nextOffset, packetSize) - packetSize;
    messageId = Interlocked.Increment(ref this.currentMessageId);

    if (this.nextOffset <= this.va.Capacity)
        return true;
    reservedOffset = -1;
    return false;
}

P.S. Мне известно о проблемах с порядком байтов в примере кода, но, как я уже сказал, просто рассмотрите псевдокод, чтобы проиллюстрировать проблему.

1 Ответ

1 голос
/ 29 июня 2019

Извините, если это не относится непосредственно к вашей основной проблеме (неблокирующая атомарность), но я вижу, что вы манипулируете отображенными в память файлами, используя классы MemoryMappedFile и MemoryMappedViewAccessor.

Я действительно не знаю, обращались ли к этому текущие итерации .NET Framework, но в кодовой базе, которую мы написали около трех лет назад, мы обнаружили, что преобразование файлов в памяти с использованием этих классов предлагало действительно плохо Производительность (примерно в 7 раз медленнее, если я правильно помню) по сравнению с использованием Win32 API и манипулированием указателями в отображенной памяти даже внутри управляемого C ++ / CLI класса.

Я настоятельно рекомендую вам протестировать этот метод, вы можете быть удивлены приростом производительности (как мы, конечно, сделали), и, возможно, прирост производительности настолько значителен, что он позволяет вам позволить себе затраты на стандартную блокировку для достижения атомарность, которую вы желаете.

Если вы хотите изучить этот путь, вот фрагмент кода, который показывает основы техники.

Int32 StationHashStorage::Open() {
   msclr::lock lock(_syncRoot);
   if( _isOpen )
      return 0;
   String^ fileName = GetFullFileName();

   _szInBytes = ComputeFileSizeInBytes(fileName);
   String^ mapExtension = GetFileExtension();
   String^ mapName = String::Format("{0}{1}_{2}", _stationId, _date.ToString("yyyyMMdd"), mapExtension);

   marshal_context context;
   LPCTSTR pMapName = context.marshal_as<const TCHAR*>(mapName);

   {
      msclr::lock lock( _openLock );
         // Try to see if another storage instance has requested the same memory-mapped file and share it
         _hMapping = OpenFileMapping(FILE_MAP_READ | FILE_MAP_WRITE, FALSE, pMapName);
         if( !_hMapping ) {
            // This is the first instance acquiring the file
            LPCTSTR pFileName = context.marshal_as<const TCHAR*>(fileName);
            // Try to open the existing file, or create new one if not exists
            _hFile = CreateFile(pFileName, 
                                GENERIC_READ | GENERIC_WRITE, 
                                FILE_SHARE_READ,
                                NULL,
                                OPEN_ALWAYS,
                                FILE_ATTRIBUTE_NORMAL,
                                NULL);
            if( !_hFile )
               throw gcnew IOException(String::Format(Strings::CreateFileFailed, GetLastError(), _stationId));
            _hMapping = CreateFileMapping(_hFile, 
                                          NULL,
                                          PAGE_READWRITE | SEC_COMMIT,
                                          0,
                                          _szInBytes,
                                          pMapName);
            if( !_hMapping ) 
               throw gcnew IOException(String::Format(Strings::CreateMappingFailed, GetLastError(), _stationId));
            _usingSharedFile = false;
         } else {
            _usingSharedFile = true;
         }
      }

// _pData gives you access to the entire requested memory range, you can directly
// dereference it,  memcopy it, etc.

   _pData = (UInt32*)::MapViewOfFile(_hMapping, FILE_MAP_READ | FILE_MAP_WRITE, 0, 0, 0);

   if( !_pData ) 
      throw gcnew IOException(String::Format(Strings::MapViewOfFileFailed, ::GetLastError(), _stationId));

   // warm-up the view by touching every page
   Int32 dummy = 0;
   for( int i = 0; i < _szInBytes / sizeof(Int32); i+= 1024 ) {
      dummy ^=  _pData[i];
   }
   // return the dummy value to prevent the optimizer from removing the apparently useless loop
   _isOpen = true;
   return dummy;
}

void StationHashStorage::Cleanup() {
     if( !_disposed ) {
      // dispose unmanaged resources here
      if( _pData ) {
         if( !UnmapViewOfFile(_pData) ) 
            LOG_ERROR(Strings::UnmapViewOfFileFailed, ::GetLastError(), _stationId);
         _pData = NULL;
      }

      if( _hMapping ) {
         if( !CloseHandle(_hMapping) ) 
            LOG_ERROR(Strings::CloseMappingFailed, ::GetLastError(), _stationId);
         _hMapping = NULL;
      }


      if( _hFile ) {
         if( !CloseHandle(_hFile) ) 
            LOG_ERROR(Strings::CloseFileFailed, ::GetLastError(), _stationId);
         _hFile = NULL;
      }
      _disposed = true;
   }
}

Теперь относительно вашего реального вопроса. Возможно ли, что вы внедрили сгенерированный идентификатор как часть потока данных? Моя идея будет выглядеть примерно так:

  1. Предварительно записать все содержимое вашей памяти с использованием фиктивного известного значения (возможно, 0xffffffff).

  2. Используйте текущую атомную логику проверки емкости.

  3. После написания полезной нагрузки сообщения, вы немедленно пишете вычисленный идентификатор сообщения (ваша проверка емкости должна учитывать эти дополнительные данные)

  4. Вместо использования Interlocked.Add для получения следующего Id, вы должны ввести цикл, который проверяет память до текущего сообщения (предыдущего идентификатора сообщения), пока оно не станет отличным от вашего фиктивного известная ценность. После выхода из цикла текущим идентификатором сообщения будет считанное значение + 1.

Это потребует некоторых специальных манипуляций с первым вставленным сообщением (поскольку для этого нужно заполнить первый маркер Id в вашем потоке. Вам также нужно быть осторожным (если вы используете длинные идентификаторы и находитесь в 32-битном режиме) ) что ваш Id-поток читает и пишет атомарно.

Удачи, и я действительно призываю вас попробовать Win32 API, было бы очень интересно узнать, улучшились ли, надеюсь, вещи! Не стесняйтесь обращаться ко мне, если вам нужна помощь с кодом C ++ / CLI.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...