Передача строки C переменной длины через общую память Mmap - PullRequest
0 голосов
/ 05 июля 2018

Допустим, у меня есть процесс A и процесс B, и процесс A хотел бы передать строку C процессу B через общую память shm_open () + mmap ().

Какой самый эффективный способ задержки?

Ответ на этот пост говорит о том, что после C ++ 11 std :: atomic является правильным способом обмена данными через разделяемую память.

Однако я не вижу, как написать что-то, чтобы написать строку C с чем-то вроде этого:

struct Buffer {
std::atomic<uint32_t> length;
std::atomic<char*> str;
} __attribute__((packed));

Учитывая, что у меня есть общая память, созданная следующим образом:

class SHM {
    char* _ptr;
public:
    SHM() {
        const auto handle = shm_open("myTest", O_RDWR|O_CREAT, 0666);
        const auto size =  4 * 1024 * 1024;
        if (-1 == ftruncate(handle, size)) {
            throw;
        }
        _ptr = (char*)mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);

        if(_ptr == MAP_FAILED){
            throw;
        }

        int rc = fchmod(handle, 0666);
        if (rc == -1) {
            throw;
        }
    }

    // assume to caller will do buffer.size.store(someLength, std::memory_order_release); after filling up Buffer::str
    Buffer& getBuffer() noexcept {
        return *reinrepret_cast<Buffer*>(_ptr);
    }

    Buffer& read() {
        auto& buffer = *reinrepret_cast<Buffer*>(_ptr);
        while (buffer.size.load(std::memory_order_acquire) > 0) {
            buffer.str.load(std::memory_order_relaxed);
            return buffer;
        }
    }
};

Как вызывающий абонент SHM::getBuffer() может правильно записать в Buffer :: str char by char, чтобы процесс B мог вызвать SHM::read() для извлечения?

Действительно ли buffer.str.load (std :: memory_order_relaxed) загружается атомарно и правильно? Я сомневаюсь в этом, поскольку он даже не знает длины.

Это для Linux, X86-64, GCC 7.

Заранее спасибо.

1 Ответ

0 голосов
/ 05 июля 2018

Вот рабочий эскиз для случая одного производителя и одного потребителя (независимо от того, являются ли потоки производителя / потребителя одним и тем же процессом или нет), без ожидания:

#include <sys/types.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <unistd.h>
#include <fcntl.h>
#include <utility>
#include <cstring>
#include <string>
#include <atomic>

class SingleProducerSingleConsumerIndexes {
    std::atomic<uint64_t> produced_ = {};
    std::atomic<uint64_t> consumed_ = {};

public: // Producer interface.
    uint64_t produced() {
        auto consumed = consumed_.load(std::memory_order_acquire); // Syncronizes with store 2.
        auto produced = produced_.load(std::memory_order_relaxed);
        if(produced != consumed || !produced)
            return produced;
        // Entire buffer was consumed. Rewind.
        produced_.store(0, std::memory_order_release); // Store 1.
        consumed_.store(0, std::memory_order_relaxed); // Store 3.
        return 0;
    }

    void produce(uint64_t end) {
        produced_.store(end, std::memory_order_release); // Store 1.
    }

public: // Consumer interface.
    std::pair<uint64_t, uint64_t> available() const {
        auto produced = produced_.load(std::memory_order_acquire); // Syncronizes with store 1.
        auto consumed = consumed_.load(std::memory_order_relaxed);
        // min handles the case of store 3 not visible yet.
        return {std::min(produced, consumed), produced};
    }

    void consume(uint64_t end) {
        consumed_.store(end, std::memory_order_release); // Store 2.
    }
};

class SharedMemoryStrings {
    void* p_;
    static constexpr int size = 4 * 1024 * 1024;
    static constexpr int buffer_size = size - sizeof(SingleProducerSingleConsumerIndexes);
public:
    SharedMemoryStrings() {
        auto handle = ::shm_open("/another-test", O_RDWR|O_CREAT, 0666);
        if(-1 == ::ftruncate(handle, size))
            throw;
        p_ = ::mmap(0,size , PROT_READ | PROT_WRITE, MAP_SHARED, handle, 0);
        ::close(handle);
        if(p_ == MAP_FAILED)
            throw;
    }

    ~SharedMemoryStrings() {
        ::munmap(p_, size);
    }

    void produce(std::string const& s) {
        auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
        auto produced = indexes->produced();
        uint64_t new_end = produced + sizeof(uint64_t) + s.size();
        if(new_end > buffer_size)
            throw; // Out of buffer space.

        auto* buffer = reinterpret_cast<char*>(indexes + 1) + produced;
        uint64_t size = s.size();
        memcpy(buffer, &size, sizeof size);
        buffer += sizeof size;
        memcpy(buffer, s.data(), s.size());

        indexes->produce(new_end);
    }

    bool try_consume(std::string& s) {
        auto* indexes = static_cast<SingleProducerSingleConsumerIndexes*>(p_);
        auto available = indexes->available();
        auto consumed = available.first;
        auto produced = available.second;
        if(consumed == produced)
            return false; // No data available.

        auto* buffer = reinterpret_cast<char const*>(indexes + 1) + consumed;
        uint64_t size;
        memcpy(&size, buffer, sizeof size);
        buffer += sizeof size;
        // Reuse the string to minimize memory allocations.
        s.assign(buffer, size);

        indexes->consume(consumed + sizeof(uint64_t) + size);
        return true;
    }
};

int main(int ac, char** av) {
    if(ac > 1) {
        // Producer.
        SharedMemoryStrings a;
        for(int i = 1; i < ac; ++i)
            a.produce(av[i]);
    }
    else {
        // Consumer.
        SharedMemoryStrings a;
        for(std::string s;;) { // Busy-wait loop.
            if(a.try_consume(s)) // Reuse the string to minimize memory allocations.
                printf("%s\n", s.c_str());
            // else // Potential optimization.
            //     _mm_pause();
        }
    }
}

Примечания:

  • Скомпилируйте код как g++ -o test -W{all,extra,error} -std=gnu++11 -O3 -DNDEBUG -march=native -pthread -lrt test.cc. Предполагая, что этот источник называется test.cc.

  • Запустить потребителя без аргументов, ./test. Производитель с аргументами, вроде ./test hello world. Порядок старта не имеет значения.

  • Это решение для одного производителя и для одного потребителя. Это без ожидания (вызовы производителя и потребителя выполняются за фиксированное количество инструкций, без цикла), что лучше, чем просто без блокировки (что не гарантирует завершение в фиксированное количество инструкций). Не может идти быстрее, что это.

  • На x86-64 они получают и освобождают атомарные нагрузки и хранят компиляцию в простые инструкции mov, потому что текущая модель памяти x86-64 слишком сильна. Однако использование std::atomic и определенных порядков памяти гарантирует, что компилятор не переупорядочивает инструкции. И это также гарантирует, что код компилируется и корректно работает на архитектурах с более слабыми моделями памяти и вставляет соответствующие барьеры, если необходимо, что volatile не может сделать. Как, например, PowerPC. Использование volatile аналогично использованию std::memory_order_relaxed. См. Сравнение сборки .

  • produced_.store(end, std::memory_order_release); гарантирует, что все предыдущие хранилища (memcpy в общей памяти), созданные потоком производителя, станут видимыми для потока потребителя, как только эффект этого хранилища станет видимым produced_.load(std::memory_order_acquire);. См. http://preshing.com/20130823/the-synchronizes-with-relation/ для тщательного изучения предмета. Также std::memory_order говорит это лучше всего:

    memory_order_acquire Операция загрузки с этим порядком памяти выполняет операцию получения в уязвимом месте памяти: никакие операции чтения или записи в текущем потоке не могут быть переупорядочены до этой загрузки. Все записи в других потоках, которые выпускают одну и ту же атомарную переменную, видны в текущем потоке.

    memory_order_release Операция сохранения с этим порядком памяти выполняет операцию освобождения: после этого хранилища нельзя изменить порядок чтения или записи в текущем потоке. Все записи в текущем потоке видны в других потоках, которые получают ту же атомарную переменную , и записи, которые переносят зависимость в атомарную переменную, становятся видимыми в других потоках, которые потребляют тот же атомарный.

  • Производитель определяет, когда потребитель использовал все доступные данные. В этом случае производитель перематывает буфер на старт. Это сделано для того, чтобы избежать обработки переноса буфера для кольцевого буфера. Если потребитель не может обработать сообщения достаточно быстро, буфер будет заполнен в конечном счете независимо от этого.

  • Он никогда не вызывает SingleProducerSingleConsumerIndexes конструктор. Он основан на том факте, что новый файл инициализируется нулями, и это то, что сделал бы конструктор. В более сложных сценариях необходимо вызвать конструктор общих данных, если файл только что был создан. Это можно сделать, создав сначала временный файл с уникальным именем (если файл еще не существует), сопоставив файл в памяти и вызвав конструктор. Затем переименуйте этот временный файл в окончательное имя (rename является атомарным). Если переименование не удалось, поскольку файл уже существует, удалите временный файл и начните снова.

  • Потребитель занят в ожидании минимально возможной задержки. Если вы хотите, чтобы потребитель блокировал во время ожидания, можно добавить общий мьютекс процесса и переменную условия, чтобы это произошло. Однако для пробуждения потока, ожидающего условную переменную (futex в Linux) в ядре, требуется несколько микросекунд. Для этого потребуется вызвать конструктор SingleProducerSingleConsumerIndexes, чтобы выполнить всю необходимую инициализацию (например, инициализировать надежный адаптивный мьютекс с общим процессом и переменную условия с общим процессом).

...