В чем проблема с шаблоном функции в следующем коде? - PullRequest
1 голос
/ 03 февраля 2020

Я получил следующий код от http://hansewetz.blogspot.com/2014/07/using-queues-with-boostasio-part-i.html

#pragma once
#ifndef __QUEUE_LISTENER_H__
#define __QUEUE_LISTENER_H__

#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <thread>

#include <utility>
#include <queue>
#include <mutex>
#include <condition_variable>

namespace boost {
    namespace asio {



        // a simple thread safe queue used as default queue in boost::asio::queue_listener
        template<typename T, typename Container = std::queue<T>>
        class simple_queue {
        public:
            // typedef for value stored in queue
            // (need this so we can create an item with default ctor)
            using value_type=T;

            // ctors,assign,dtor
            simple_queue() = default;
            simple_queue(simple_queue const&) = delete;
            simple_queue(simple_queue&&) = default;
            simple_queue& operator=(simple_queue const&) = delete;
            simple_queue& operator=(simple_queue&&) = default;
            ~simple_queue() = default;

            // put a message into queue
            void enq(T t) {
                std::lock_guard<std::mutex>lock(mtx_);
                q_.push(t);
                cond_.notify_all();
            }
            // dequeue a message (return.first == false if deq() was disabled)
            std::pair<bool, T>deq() {
                std::unique_lock<std::mutex>lock(mtx_);
                cond_.wait(lock, [&]() {return !deq_enabled_ || !q_.empty();});

                // if deq is disabled or queue is empty return 
                if (!deq_enabled_ || q_.empty()) {
                    return std::make_pair(false, T{});
                }
                // check if we have a message
                std::pair<bool, T>ret{ std::make_pair(true,q_.front()) };
                q_.pop();
                return ret;
            }
            // cancel deq operations (will also release blocking threads)
            void disable_deq(bool disable) {
                std::unique_lock<std::mutex>lock(mtx_);
                deq_enabled_ = !disable;
                cond_.notify_all();
            }
            // check if queue is empty
            bool empty()const {
                std::unique_lock<std::mutex>lock(mtx_);
                return q_.empty();
            }
        private:
            mutable std::mutex mtx_;
            mutable std::condition_variable cond_;
            bool deq_enabled_ = true;
            Container q_;
        };


        // forward decl
        class queue_listener_impl;
        template<typename Impl = queue_listener_impl>class basic_queue_listener_service;

        // --- IO Object (used by client) -----------------------------
        template<typename Service, typename Queue>
        class basic_queue_listener :public boost::asio::basic_io_object<Service> {
        public:
            // ctor
            explicit basic_queue_listener(boost::asio::io_service& io_service) :
                boost::asio::basic_io_object<Service>(io_service) {
            }
            // async deq operation
            template <typename Handler>
            void async_deq(std::shared_ptr<Queue>q, Handler handler) {
                // wace this->service.async_deq(this->implementation, q, handler);
                this->get_service().async_deq(this->get_implementation(), q, handler);

            }
        };
        // typedef for using standard service object
        template<typename T>
        using simple_queue_listener=basic_queue_listener<basic_queue_listener_service<>, simple_queue<T>>;

        // qlistener1.async_deq(q1, boost::bind(qhandler1<string>, _1, _2, &qlistener1, q1));
        // void qhandler1(boost::system::error_code const& ec, 
        //               T item, 
        //               boost::asio::simple_queue_listener<T>* asioq, 
        //               shared_ptr<boost::asio::simple_queue<string>>q1)

        // --- service class -----------------------------
        // (for one io_service, only one object created)
        template<typename Impl>
        class basic_queue_listener_service :public boost::asio::io_service::service {
        public:
            // required to have id of service
            static boost::asio::io_service::id id;

            // ctor
            explicit basic_queue_listener_service(boost::asio::io_service& io_service) :
                boost::asio::io_service::service(io_service) {
            }
            // dtor
            ~basic_queue_listener_service() {
            }
            // get a typedef  for implementation
            using implementation_type=std::shared_ptr<Impl>;

            // mandatory (construct an implementation object)
            void construct(implementation_type& impl) {

                impl.reset(new Impl(
                    //this->get_io_service()
                    this->get_io_context()
                ));


            }
            // mandatory (destroy an implementation object)
            void destroy(implementation_type& impl) {
                impl.reset();
            }
            // async sync deq operation
            template <typename Handler, typename Queue>
            void async_deq(implementation_type& impl, std::shared_ptr<Queue>q, Handler handler) {
                // this is a non-blocking operation so we are OK calling impl object in this thread
                impl->async_deq(impl, q, handler); // problem
            }
        private:
            // shutdown service (required)
            void shutdown_service() {
            }
        };
        // definition of id of service (required)
        template <typename Impl>
        boost::asio::io_service::id basic_queue_listener_service<Impl>::id;

        // --- implementation -----------------------------
        class queue_listener_impl : public std::enable_shared_from_this<queue_listener_impl> {
        public:
            // ctor (set up work queue for io_service so we don't bail out when executing run())
            queue_listener_impl(boost::asio::io_service& post_io_service) :
                impl_work_(new boost::asio::io_service::work(impl_io_service_)),
                impl_thread_([&]() {impl_io_service_.run();}),
                post_io_service_(post_io_service) {
            }
            // dtor (clear work queue, stop io service and join thread)
            ~queue_listener_impl() {
                impl_work_.reset(nullptr);
                impl_io_service_.stop();
                if (impl_thread_.joinable())impl_thread_.join();
            }
        public:
            // deque message (post request to thread)
            template<typename Handler, typename Queue>
            void async_deq(std::shared_ptr<queue_listener_impl>impl, std::shared_ptr<Queue>tq, Handler handler) {
                impl_io_service_.post(deq_operation<Handler, Queue>(impl, post_io_service_, tq, handler)); // problem
            }


        private:
            // function object calling blocking deq() on queue
            template <typename Handler, typename Queue>
            class deq_operation {
            public:
                // ctor
                deq_operation(std::shared_ptr<queue_listener_impl>impl, boost::asio::io_service& io_service, std::shared_ptr<Queue>tq, Handler handler) :
                    wimpl_(impl), io_service_(io_service), work_(io_service), tq_(tq), handler_(handler) {
                }

                // function calling implementation object - runs in the thread created in ctor
                void operator()() {
                    // make sure implementation object is still valid
                    std::shared_ptr<queue_listener_impl>impl{ wimpl_.lock() };

                    // if valid, go ahead and do blocking call on queue, otherwise post aborted message
                    if (impl) {
                        std::pair<bool, typename Queue::value_type>ret{ tq_->deq() };
                        boost::system::error_code ec = (!ret.first ? boost::asio::error::operation_aborted : boost::system::error_code());
                        auto x = boost::asio::detail::bind_handler(handler_, ec, ret.second);

                        this->io_service_.post(x); // problem
                    }
                    else {
                        //problem this->io_service_.post(boost::asio::detail::bind_handler(handler_, boost::asio::error::operation_aborted, typename Queue::value_type()));
                    }
                }
            private:
                std::weak_ptr<queue_listener_impl>wimpl_;
                boost::asio::io_service& io_service_;
                boost::asio::io_service::work work_;
                std::shared_ptr<Queue>tq_;
                Handler handler_;
            };
            // private data
            boost::asio::io_service impl_io_service_;
            std::unique_ptr<boost::asio::io_service::work>impl_work_;
            std::thread impl_thread_;
            boost::asio::io_service& post_io_service_;
        };
    }
}



#endif

и функции теста

//********** Queue Listener Test *****

#include <queue_listener.h>

#include <boost/asio.hpp>
#include <boost/log/trivial.hpp>
#include <boost/lexical_cast.hpp>
#include <string>
#include <memory>
#include <thread>
using namespace std;

// some constants
constexpr size_t maxmsg1{ 10 };
constexpr size_t tmoSeleepBetweenSendMs1{ 100 };

// queue listener handler for queue 1
size_t nreceived1{ 0 };
template<typename T>
void qhandler1(boost::system::error_code const& ec, T item, boost::asio::simple_queue_listener<T>* asioq, shared_ptr<boost::asio::simple_queue<string>>q1) {
    // print item if error code is OK
    if (ec)BOOST_LOG_TRIVIAL(debug) << "received item in qhandler1 (via asio), item: <invalid>, ec: " << ec;
    else {
        BOOST_LOG_TRIVIAL(debug) << "received item in qhandler1 (via asio), item: " << item << ", ec: " << ec;
        if (++nreceived1 != maxmsg1)asioq->async_deq(q1, std::bind(qhandler1<T>, _1, _2, asioq, q1));
    }
}
// queue sender for queue 1
size_t nsent{ 0 };
void senderq1(shared_ptr<boost::asio::simple_queue<string>>q1) {
    for (;nsent < maxmsg1;++nsent) {
        string item{ boost::lexical_cast<string>(nsent) };
        BOOST_LOG_TRIVIAL(debug) << "sending item \"" << item << "\"in separate thread ...";
        q1->enq(item);
        this_thread::sleep_for(std::chrono::milliseconds(tmoSeleepBetweenSendMs1));
    }
}
// test program
int queuetest() {
    try {
        // underlying queue
        shared_ptr<boost::asio::simple_queue<string>>q1{ new boost::asio::simple_queue<string> };

        // asio io service
        boost::asio::io_service ios;

        // asio queue listeners
        boost::asio::simple_queue_listener<string>qlistener1(ios);
        qlistener1.async_deq(q1, boost::bind(qhandler1<string>, _1, _2, &qlistener1, q1));

        // run a sender thread, run io service and join sender thread
        std::thread thrq1{ senderq1,q1 };
        ios.run();

        thrq1.join();
    }
    catch (exception const& e) {
        BOOST_LOG_TRIVIAL(debug) << "cought exception: " << e.what();
    }

    return 0;
}

Я получаю неприятную ошибку компиляции. Когда я удаляю строки, помеченные как «проблема», он компилируется.

Ошибка:

1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler .hpp (165,1): ошибка C2893: не удалось специализировать шаблон функции 'неизвестный тип std :: _ Binder, boost :: asio :: simple_queue >>>> *, std :: shared_ptr >>>>), const boost :: arg <1> &, const boost :: arg <2> &, boost :: asio :: basic_queue_listener, boost :: asio :: simple_queue >>>> * &, std :: shared_ptr >>>> &> :: operator () (_ Unbound && ...) const '1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): ошибка C2893: с 1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): ошибка C2893: [1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): ошибка C2893: T = std :: string, 1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): ошибка C2893: _Ty = std :: string 1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): ошибка C2893:] 1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1) : message: остроумие h следующие аргументы шаблона: 1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): message: '_Unbound = {const Arg1 &, const Arg2 &}'

Извините за количество кода, но я понятия не имею - поэтому я публикую то, что у меня есть.

1 Ответ

1 голос
/ 03 февраля 2020

Похоже, это результат неаккуратного управления пространством имен.

  • у вас using namespace std на верхнем уровне (это не рекомендуется )
  • похоже, что вы используете boost::bind в одном месте, но std::bind в другом
  • это подразумевает, что имеет смысл, что _1 и _2 будут относиться к бусту ::_1 вместо std::placeholders::_1. Однако:

    • не было квалификации
    • соответствующий заголовок не был включен (boost / bind.hpp)

    Я бы предложил придерживаться std :: bind

    using namespace std::placeholders;
    using std::bind;
    

    или исправление boost::bind использования:

     #include <boost/bind.hpp>
     using boost::bind;
    
  • queuetest() отсутствует оператор return

  • многочисленные небольшие улучшения (избегайте копирования общих указателей, избегайте необработанных новых, рассмотрите маркировку noexcept и override методов, нет необходимости в boost::lexical_cast<std::string>, когда вы можете std::to_string et c)

Фиксированная версия Live On Coliru

#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <thread>

#include <condition_variable>
#include <mutex>
#include <queue>
#include <utility>

namespace boost {
namespace asio {

// a simple thread safe queue used as default queue in boost::asio::queue_listener
template <typename T, typename Container = std::queue<T>> class simple_queue {
  public:
    // typedef for value stored in queue
    // (need this so we can create an item with default ctor)
    using value_type = T;

    // ctors,assign,dtor
    simple_queue() = default;
    simple_queue(simple_queue const&) = delete;
    simple_queue(simple_queue&&) = default;
    simple_queue& operator=(simple_queue const&) = delete;
    simple_queue& operator=(simple_queue&&) = default;
    ~simple_queue() = default;

    // put a message into queue
    void enq(T t) {
        std::lock_guard<std::mutex> lock(mtx_);
        q_.push(t);
        cond_.notify_all();
    }
    // dequeue a message (return.first == false if deq() was disabled)
    std::pair<bool, T> deq() {
        std::unique_lock<std::mutex> lock(mtx_);
        cond_.wait(lock, [&]() { return !deq_enabled_ || !q_.empty(); });

        // if deq is disabled or queue is empty return
        if (!deq_enabled_ || q_.empty()) {
            return std::make_pair(false, T{});
        }
        // check if we have a message
        std::pair<bool, T> ret{ std::make_pair(true, q_.front()) };
        q_.pop();
        return ret;
    }
    // cancel deq operations (will also release blocking threads)
    void disable_deq(bool disable) {
        std::unique_lock<std::mutex> lock(mtx_);
        deq_enabled_ = !disable;
        cond_.notify_all();
    }
    // check if queue is empty
    bool empty() const {
        std::unique_lock<std::mutex> lock(mtx_);
        return q_.empty();
    }

  private:
    mutable std::mutex mtx_;
    mutable std::condition_variable cond_;
    bool deq_enabled_ = true;
    Container q_;
};

// forward decl
class queue_listener_impl;
template <typename Impl = queue_listener_impl> class basic_queue_listener_service;

// --- IO Object (used by client) -----------------------------
template <typename Service, typename Queue> class basic_queue_listener : public boost::asio::basic_io_object<Service> {
  public:
    // ctor
    explicit basic_queue_listener(boost::asio::io_service& io_service) : boost::asio::basic_io_object<Service>(io_service) {}
    // async deq operation
    template <typename Handler> void async_deq(std::shared_ptr<Queue> q, Handler handler) {
        // wace this->service.async_deq(this->implementation, q, handler);
        this->get_service().async_deq(this->get_implementation(), q, handler);
    }
};
// typedef for using standard service object
template <typename T> using simple_queue_listener = basic_queue_listener<basic_queue_listener_service<>, simple_queue<T>>;

// qlistener1.async_deq(q1, bind(qhandler1<std::string>, _1, _2, &qlistener1, q1));
// void qhandler1(boost::system::error_code const& ec,
//               T item,
//               boost::asio::simple_queue_listener<T>* asioq,
//               shared_ptr<boost::asio::simple_queue<std::string>>q1)

// --- service class -----------------------------
// (for one io_service, only one object created)
template <typename Impl> class basic_queue_listener_service : public boost::asio::io_service::service {
  public:
    // required to have id of service
    static boost::asio::io_service::id id;

    // ctor
    explicit basic_queue_listener_service(boost::asio::io_service& io_service) : boost::asio::io_service::service(io_service) {}
    // dtor
    ~basic_queue_listener_service() override = default;
    // get a typedef  for implementation
    using implementation_type = std::shared_ptr<Impl>;

    // mandatory (construct an implementation object)
    void construct(implementation_type& impl) {
        impl = std::make_shared<Impl>(
            // this->get_io_service()
            this->get_io_context());
    }
    // mandatory (destroy an implementation object)
    void destroy(implementation_type& impl) { impl.reset(); }
    // async sync deq operation
    template <typename Handler, typename Queue> void async_deq(implementation_type& impl, std::shared_ptr<Queue> q, Handler handler) {
        // this is a non-blocking operation so we are OK calling impl object in this thread
        impl->async_deq(impl, q, handler); // problem
    }

  private:
    // shutdown service (required)
    void shutdown_service() override {}
};
// definition of id of service (required)
template <typename Impl> boost::asio::io_service::id basic_queue_listener_service<Impl>::id;

// --- implementation -----------------------------
class queue_listener_impl : public std::enable_shared_from_this<queue_listener_impl> {
  public:
    // ctor (set up work queue for io_service so we don't bail out when executing run())
    explicit queue_listener_impl(boost::asio::io_service& post_io_service)
            : impl_work_(new boost::asio::io_service::work(impl_io_service_)), impl_thread_([&]() { impl_io_service_.run(); }), post_io_service_(post_io_service) {}
    // dtor (clear work queue, stop io service and join thread)
    ~queue_listener_impl() {
        impl_work_.reset(nullptr);
        impl_io_service_.stop();
        if (impl_thread_.joinable()) {
            impl_thread_.join();
        }
    }

  public:
    // deque message (post request to thread)
    template <typename Handler, typename Queue> void async_deq(std::shared_ptr<queue_listener_impl> impl, std::shared_ptr<Queue> tq, Handler handler) {
        impl_io_service_.post(deq_operation<Handler, Queue>(impl, post_io_service_, tq, handler)); // problem
    }

  private:
    // function object calling blocking deq() on queue
    template <typename Handler, typename Queue> class deq_operation {
      public:
        // ctor
        deq_operation(std::shared_ptr<queue_listener_impl> const& impl, boost::asio::io_service& io_service, std::shared_ptr<Queue> tq, Handler handler)
                : wimpl_(impl), io_service_(io_service), work_(io_service), tq_(std::move(tq)), handler_(std::move(handler)) {}

        // function calling implementation object - runs in the thread created in ctor
        void operator()() {
            // make sure implementation object is still valid
            std::shared_ptr<queue_listener_impl> impl{ wimpl_.lock() };

            // if valid, go ahead and do blocking call on queue, otherwise post aborted message
            if (impl) {
                std::pair<bool, typename Queue::value_type> ret{ tq_->deq() };
                boost::system::error_code ec = (!ret.first ? boost::asio::error::operation_aborted : boost::system::error_code());
                auto x = boost::asio::detail::bind_handler(handler_, ec, ret.second);

                this->io_service_.post(x); // problem
            } else {
                // problem this->io_service_.post(boost::asio::detail::bind_handler(handler_, boost::asio::error::operation_aborted, typename Queue::value_type()));
            }
        }

      private:
        std::weak_ptr<queue_listener_impl> wimpl_;
        boost::asio::io_service& io_service_;
        boost::asio::io_service::work work_;
        std::shared_ptr<Queue> tq_;
        Handler handler_;
    };
    // private data
    boost::asio::io_service impl_io_service_;
    std::unique_ptr<boost::asio::io_service::work> impl_work_;
    std::thread impl_thread_;
    boost::asio::io_service& post_io_service_;
};
} // namespace asio
} // namespace boost

//********** Queue Listener Test *****

//#include <queue_listener.h>
#if 1
using namespace std::placeholders;
using std::bind;
#else
#include <boost/bind.hpp>
using boost::bind;
#endif

#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/log/trivial.hpp>
#include <memory>
#include <string>
#include <thread>

// some constants
constexpr size_t maxmsg1{ 10 };
constexpr size_t tmoSeleepBetweenSendMs1{ 100 };

// queue listener handler for queue 1
size_t nreceived1{ 0 };
template <typename T>
void qhandler1(boost::system::error_code const& ec, T item, boost::asio::simple_queue_listener<T>* asioq, std::shared_ptr<boost::asio::simple_queue<std::string>> q1) {
    // print item if error code is OK
    if (ec) {
        BOOST_LOG_TRIVIAL(debug) << "received item in qhandler1 (via asio), item: <invalid>, ec: " << ec;
    } else {
        BOOST_LOG_TRIVIAL(debug) << "received item in qhandler1 (via asio), item: " << item << ", ec: " << ec;
        if (++nreceived1 != maxmsg1) {
            asioq->async_deq(q1, bind(qhandler1<T>, _1, _2, asioq, q1));
        }
    }
}
// queue sender for queue 1
size_t nsent{ 0 };
void senderq1(std::shared_ptr<boost::asio::simple_queue<std::string>> const& q1) {
    for (; nsent < maxmsg1; ++nsent) {
        std::string item{ std::to_string(nsent) };
        BOOST_LOG_TRIVIAL(debug) << "sending item \"" << item << "\"in separate thread ...";
        q1->enq(item);
        std::this_thread::sleep_for(std::chrono::milliseconds(tmoSeleepBetweenSendMs1));
    }
}
// test program
int queuetest() {
    try {
        // underlying queue
        std::shared_ptr<boost::asio::simple_queue<std::string>> q1{ new boost::asio::simple_queue<std::string> };

        // asio io service
        boost::asio::io_service ios;

        // asio queue listeners
        boost::asio::simple_queue_listener<std::string> qlistener1(ios);
        qlistener1.async_deq(q1, bind(qhandler1<std::string>, _1, _2, &qlistener1, q1));

        // run a sender thread, run io service and join sender thread
        std::thread thrq1{ senderq1, q1 };
        ios.run();

        thrq1.join();
    } catch (std::exception const& e) {
        BOOST_LOG_TRIVIAL(debug) << "Caught exception: " << e.what();
    }
    return 0;
}

int main() { 
    return queuetest();
}

Печать

g++ -std=c++17 -Os -Wall -pedantic -pthread main.cpp -lboost_{system,thread,log,log_setup} -DBOOST_LOG_DYN_LINK && ./a.out
[2020-02-03 12:01:28.038827] [0x00007f0f24f57700] [debug]   sending item "0"in separate thread ...
[2020-02-03 12:01:28.039181] [0x00007f0f27f7d740] [debug]   received item in qhandler1 (via asio), item: 0, ec: system:0
[2020-02-03 12:01:28.139241] [0x00007f0f24f57700] [debug]   sending item "1"in separate thread ...
[2020-02-03 12:01:28.139458] [0x00007f0f27f7d740] [debug]   received item in qhandler1 (via asio), item: 1, ec: system:0
[2020-02-03 12:01:28.239550] [0x00007f0f24f57700] [debug]   sending item "2"in separate thread ...
[2020-02-03 12:01:28.239771] [0x00007f0f27f7d740] [debug]   received item in qhandler1 (via asio), item: 2, ec: system:0
[2020-02-03 12:01:28.339847] [0x00007f0f24f57700] [debug]   sending item "3"in separate thread ...
[2020-02-03 12:01:28.340025] [0x00007f0f27f7d740] [debug]   received item in qhandler1 (via asio), item: 3, ec: system:0
[2020-02-03 12:01:28.440149] [0x00007f0f24f57700] [debug]   sending item "4"in separate thread ...
[2020-02-03 12:01:28.440383] [0x00007f0f27f7d740] [debug]   received item in qhandler1 (via asio), item: 4, ec: system:0
[2020-02-03 12:01:28.540445] [0x00007f0f24f57700] [debug]   sending item "5"in separate thread ...
[2020-02-03 12:01:28.540678] [0x00007f0f27f7d740] [debug]   received item in qhandler1 (via asio), item: 5, ec: system:0
[2020-02-03 12:01:28.640795] [0x00007f0f24f57700] [debug]   sending item "6"in separate thread ...
[2020-02-03 12:01:28.641033] [0x00007f0f27f7d740] [debug]   received item in qhandler1 (via asio), item: 6, ec: system:0
[2020-02-03 12:01:28.741140] [0x00007f0f24f57700] [debug]   sending item "7"in separate thread ...
[2020-02-03 12:01:28.741368] [0x00007f0f27f7d740] [debug]   received item in qhandler1 (via asio), item: 7, ec: system:0
[2020-02-03 12:01:28.841478] [0x00007f0f24f57700] [debug]   sending item "8"in separate thread ...
[2020-02-03 12:01:28.841698] [0x00007f0f27f7d740] [debug]   received item in qhandler1 (via asio), item: 8, ec: system:0
[2020-02-03 12:01:28.941783] [0x00007f0f24f57700] [debug]   sending item "9"in separate thread ...
[2020-02-03 12:01:28.941923] [0x00007f0f27f7d740] [debug]   received item in qhandler1 (via asio), item: 9, ec: system:0
...