Очередь MPSC: условия гонки - PullRequest
0 голосов
/ 11 февраля 2019

Я пытаюсь внедрить однопользовательскую очередь с несколькими производителями без блокировки на основе , написанной Дмитрием Вьюковым на C .

Единственный тест, который я написал до сих порпочти работает.Но потребитель обычно пропускает ровно одну вещь, либо первую, либо вторую.Иногда потребитель пропускает около половины входных данных.

Как сейчас, он не блокируется.Он блокируется каждый раз, когда используется оператор new, но я надеюсь заставить его работать и написать еще несколько исчерпывающих тестов, прежде чем возиться с распределителями.

// src/MpscQueue.hpp

#pragma once

#include <memory>
#include <atomic>
#include <optional>

/**
 * Adapted from http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
 * @tparam T
 */
template< typename T >
class MpscQueue {
public:
    MpscQueue() {
        stub.next.store( nullptr );
        head.store( &stub );
        tail = &stub;
    }

    void push( const T& t ) {
        emplace( t );
    }

    void push( T&& t ) {
        emplace( std::move( t ));
    }

    template< typename ... Args >
    void emplace( Args...args ) {
        auto node = new Node{ std::make_unique<T>( std::forward<Args>( args )... ), nullptr };
        push( node );
    }

    /**
     * Returns an item from the queue and returns a unique pointer to it.
     *
     * If the queue is empty returns a unique pointer set to nullptr
     *
     * @return A unique ptr to the popped item
     */
    std::unique_ptr<T> pop() {
        Node* tailCopy = tail;
        Node* next     = tailCopy->next.load();
        auto finalize = [ & ]() {
            tail = next;
            std::unique_ptr<Node> p( tailCopy ); // free the node memory after we return
            return std::move( tail->value );
        };

        if ( tailCopy == &stub ) {
            if ( next == nullptr ) return nullptr;
            tail     = next;
            tailCopy = next;
            next     = next->next;
        }

        if ( next ) return std::move( finalize());

        if ( tail != head.load()) return nullptr;

        push( &stub );
        next = tailCopy->next;

        return next ? std::move( finalize()) : nullptr;
    }

private:
    struct Node {
        std::unique_ptr<T> value;
        std::atomic<Node*> next;
    };

    void push( Node* node ) {
        Node* prev = head.exchange( node );
        prev->next = node;
    }

    Node               stub;
    std::atomic<Node*> head;
    Node* tail;
};

// test/main.cpp

#pragma clang diagnostic push
#pragma ide diagnostic ignored "OCUnusedMacroInspection"
#define BOOST_TEST_MODULE test_module
#pragma clang diagnostic pop

#include <boost/test/unit_test.hpp>

// test/utils.hpp
#pragma once

#include <vector>

template< class T >
void removeFromBothIfIdentical( std::vector<T>& a, std::vector<T>& b ) {
    size_t i = 0;
    size_t j = 0;
    while ( i < a.size() && j < b.size()) {
        if ( a[ i ] == b[ j ] ) {
            a.erase( a.begin() + i );
            b.erase( b.begin() + j );
        }
        else if ( a[ i ] < b[ j ] ) ++i;
        else if ( a[ i ] > b[ j ] ) ++j;
    }
}

namespace std {
    template< typename T >
    std::ostream& operator<<( std::ostream& ostream, const std::vector<T>& container ) {
        if ( container.empty())
            return ostream << "[]";
        ostream << "[";
        std::string_view separator;
        for ( const auto& item: container ) {
            ostream << item << separator;
            separator = ", ";
        }
        return ostream << "]";
    }
}

template< class T >
std::vector<T> extractDuplicates( std::vector<T>& container ) {
    auto           iter = std::unique( container.begin(), container.end());
    std::vector<T> duplicates;
    std::move( iter, container.end(), back_inserter( duplicates ));
    return duplicates;
}

#define CHECK_EMPTY( container, message ) \
BOOST_CHECK_MESSAGE( (container).empty(), (message) << ": " << (container) )

// test/MpscQueue.cpp
#pragma ide diagnostic ignored "cert-err58-cpp"

#include <thread>
#include <numeric>
#include <boost/test/unit_test.hpp>
#include "../src/MpscQueue.hpp"
#include "utils.hpp"

using std::thread;
using std::vector;
using std::back_inserter;

BOOST_AUTO_TEST_SUITE( MpscQueueTestSuite )

    BOOST_AUTO_TEST_CASE( two_producers ) {
        constexpr int  until = 1000;
        MpscQueue<int> queue;

        thread producerEven( [ & ]() {
            for ( int i = 0; i < until; i += 2 )
                queue.push( i );
        } );

        thread producerOdd( [ & ]() {
            for ( int i = 1; i < until; i += 2 )
                queue.push( i );
        } );

        vector<int> actual;

        thread consumer( [ & ]() {
            using namespace std::chrono_literals;
            std::this_thread::sleep_for( 2ms );
            while ( auto n = queue.pop())
                actual.push_back( *n );
        } );

        producerEven.join();
        producerOdd.join();
        consumer.join();

        vector<int> expected( until );
        std::iota( expected.begin(), expected.end(), 0 );

        std::sort( actual.begin(), actual.end());

        vector<int> duplicates = extractDuplicates( actual );
        removeFromBothIfIdentical( expected, actual );

        CHECK_EMPTY( duplicates, "Duplicate items" );
        CHECK_EMPTY( expected, "Missing items" );
        CHECK_EMPTY( actual, "Extra items" );
    }

BOOST_AUTO_TEST_SUITE_END()

1 Ответ

0 голосов
/ 11 февраля 2019

Мой пример с несколькими производителями для одного потребителя написан на Аде.Я предлагаю это как источник виртуального «псевдокода» на ваше рассмотрение.Пример поставляется в трех файлах.

В этом примере реализован простой регистратор данных с несколькими производителями, общим буфером и одним потребителем, который регистрирует строки, созданные производителями.

Первый файл - это спецификация пакета для общего буфера,Спецификации пакета Ada определяют API для сущностей, определенных в пакете.В этом случае объекты являются защищенным буфером и процедурой остановки регистратора.

-----------------------------------------------------------------------
-- Asynchronous Data Logger
-----------------------------------------------------------------------
with Ada.Strings.Unbounded; use Ada.Strings.Unbounded;

package Async_Logger is
   type Queue_Index is mod 256;
   type Queue_T is array (Queue_Index) of Unbounded_String;

   protected Buffer is
      entry Put (Log_Entry : in String);
      entry Get (Stamped_Entry : out Unbounded_String);
   private
      Queue   : Queue_T;
      P_Index : Queue_Index := 0;
      G_Index : Queue_Index := 0;
      Count   : Natural     := 0;
   end Buffer;

   procedure Stop_Logging;

end Async_Logger;

Записи в защищенном буфере позволяют задачам (то есть потокам) записывать в буфер и читать из буфера.Записи автоматически выполняют все необходимое управление блокировкой буфера.

Реализация кода буфера и процедура Stop_Logging реализованы в теле пакета.Потребитель, ведущий запись в журнал, также реализован в теле задачи, что делает его невидимым для производящих потоков.

with Ada.Calendar;            use Ada.Calendar;
with Ada.Calendar.Formatting; use Ada.Calendar.Formatting;
with Ada.Text_IO;             use Ada.Text_IO;

package body Async_Logger is

   ------------
   -- Buffer --
   ------------

   protected body Buffer is

      ---------
      -- Put --
      ---------

      entry Put (Log_Entry : in String) when Count < Queue_Index'Modulus is
         T_Stamp : Time             := Clock;
         Value   : Unbounded_String :=
           To_Unbounded_String
             (Image (Date => T_Stamp, Include_Time_Fraction => True) & " : " &
              Log_Entry);
      begin
         Queue (P_Index) := Value;
         P_Index         := P_Index + 1;
         Count           := Count + 1;
      end Put;

      ---------
      -- Get --
      ---------

      entry Get (Stamped_Entry : out Unbounded_String) when Count > 0 is
      begin
         Stamped_Entry := Queue (G_Index);
         G_Index       := G_Index + 1;
         Count         := Count - 1;
      end Get;

   end Buffer;

   task Logger is
      entry Stop;
   end Logger;

   task body Logger is
      Phrase : Unbounded_String;
   begin
      loop
         select
            accept Stop;
            exit;
         else
            select
               Buffer.Get (Phrase);
               Put_Line (To_String (Phrase));
            or
               delay 0.01;
            end select;
         end select;
      end loop;

   end Logger;

   procedure Stop_Logging is
   begin
      Logger.Stop;
   end Stop_Logging;

end Async_Logger;

Запись Put имеет условие защиты, позволяющее выполнять эту запись только тогда, когда буфер неполный.Запись Get имеет условие защиты, позволяющее выполнять запись только тогда, когда буфер пуст.

Задача с именем Logger является задачей потребителя.Он работает до тех пор, пока не будет вызвана его запись Stop.

Процедура Stop_Logging вызывает запись Stop Регистратора.

Третий файл является "основной" процедурой, используемой для тестирования пакета Async_Logger.Этот файл создает двух производителей, P1 и P2.Каждый из этих производителей записывает 10 сообщений в буфер и затем завершает свою работу.

with Async_Logger; use Async_Logger;

procedure Async_Test is
   task P1;
   task P2;

   task body P1 is
   begin
      for I in 1..10 loop
         Buffer.Put(I'Image);
         delay 0.01;
      end loop;
   end P1;

   task body P2 is
      Num : Float := 0.0;
   begin
      for I in 1..10 loop
         Buffer.Put(Num'Image);
         Num := Num + 1.0;
         delay 0.01;
      end loop;
   end P2;

begin
   delay 0.2;
   Stop_Logging;
end Async_Test;

Процедура Async_Test просто ждет 0,2 секунды, а затем вызывает Stop_Logging.

Результат выполнения этой программы:

2019-02-11 18:35:01.83 :  1
2019-02-11 18:35:01.83 :  0.00000E+00
2019-02-11 18:35:01.85 :  1.00000E+00
2019-02-11 18:35:01.85 :  2
2019-02-11 18:35:01.87 :  3
2019-02-11 18:35:01.87 :  2.00000E+00
2019-02-11 18:35:01.88 :  3.00000E+00
2019-02-11 18:35:01.88 :  4
2019-02-11 18:35:01.90 :  5
2019-02-11 18:35:01.90 :  4.00000E+00
2019-02-11 18:35:01.92 :  6
2019-02-11 18:35:01.92 :  5.00000E+00
2019-02-11 18:35:01.93 :  6.00000E+00
2019-02-11 18:35:01.93 :  7
2019-02-11 18:35:01.95 :  7.00000E+00
2019-02-11 18:35:01.95 :  8
2019-02-11 18:35:01.96 :  8.00000E+00
2019-02-11 18:35:01.96 :  9
2019-02-11 18:35:01.98 :  10
2019-02-11 18:35:01.98 :  9.00000E+00
...