В рамках boost::asio
я хотел бы иметь некоторую асинхронную очередь; он должен позволять помещать объекты в очередь, как для обычной очереди, но с неким блокирующим «асинхронным всплытием»:
// outline of an idea only
....
....
std::shared_ptr<AsyncQueue> myAsyncQueue(new AsyncQueue(io_context);
....
auto x = std::bind(
&AsyncQueue::async_dec, // pop
myAsyncQueue,
myCallback);
....
....
io_context.post(x);
Обратный вызов должен выполняться из потока asio-worker один раз в очереди.
Я оглянулся; то, что я нашел, похоже, именно то, что мне нужно:
http://hansewetz.blogspot.com/2014/07/using-queues-with-boostasio-part-i.html
Я пытался проверить это, но получаю ошибку компилятора:
//********** Queue Listener Test *****
#include <queue_listener.h>
#include <boost/asio.hpp>
#include <boost/log/trivial.hpp>
#include <boost/lexical_cast.hpp>
#include <string>
#include <memory>
#include <thread>
using namespace std;
// some constants
constexpr size_t maxmsg1{ 10 };
constexpr size_t tmoSeleepBetweenSendMs1{ 100 };
// queue listener handler for queue 1
size_t nreceived1{ 0 };
template<typename T>
void qhandler1(boost::system::error_code const& ec, T item, boost::asio::simple_queue_listener<T>* asioq, shared_ptr<boost::asio::simple_queue<string>>q1) {
// print item if error code is OK
if (ec)BOOST_LOG_TRIVIAL(debug) << "received item in qhandler1 (via asio), item: <invalid>, ec: " << ec;
else {
BOOST_LOG_TRIVIAL(debug) << "received item in qhandler1 (via asio), item: " << item << ", ec: " << ec;
if (++nreceived1 != maxmsg1)asioq->async_deq(q1, std::bind(qhandler1<T>, _1, _2, asioq, q1));
}
}
// queue sender for queue 1
size_t nsent{ 0 };
void senderq1(shared_ptr<boost::asio::simple_queue<string>>q1) {
for (;nsent < maxmsg1;++nsent) {
string item{ boost::lexical_cast<string>(nsent) };
BOOST_LOG_TRIVIAL(debug) << "sending item \"" << item << "\"in separate thread ...";
q1->enq(item);
this_thread::sleep_for(std::chrono::milliseconds(tmoSeleepBetweenSendMs1));
}
}
// test program
int queuetest() {
try {
// underlying queue
shared_ptr<boost::asio::simple_queue<string>>q1{ new boost::asio::simple_queue<string> };
// asio io service
boost::asio::io_service ios;
// asio queue listeners
boost::asio::simple_queue_listener<string>qlistener1(ios);
qlistener1.async_deq(q1, std::bind(qhandler1<string>, std::placeholders::_1, std::placeholders::_2, &qlistener1, q1));
// run a sender thread, run io service and join sender thread
std::thread thrq1{ senderq1,q1 };
ios.run();
thrq1.join();
}
catch (exception const& e) {
BOOST_LOG_TRIVIAL(debug) << "cought exception: " << e.what();
}
}
queue_listener.h
#pragma once
#ifndef __QUEUE_LISTENER_H__
#define __QUEUE_LISTENER_H__
#include <boost/asio.hpp>
#include <boost/system/error_code.hpp>
#include <cstddef>
#include <thread>
#include <utility>
#include <queue>
#include <mutex>
#include <condition_variable>
namespace boost {
namespace asio {
// a simple thread safe queue used as default queue in boost::asio::queue_listener
template<typename T, typename Container = std::queue<T>>
class simple_queue {
public:
// typedef for value stored in queue
// (need this so we can create an item with default ctor)
using value_type=T;
// ctors,assign,dtor
simple_queue() = default;
simple_queue(simple_queue const&) = delete;
simple_queue(simple_queue&&) = default;
simple_queue& operator=(simple_queue const&) = delete;
simple_queue& operator=(simple_queue&&) = default;
~simple_queue() = default;
// put a message into queue
void enq(T t) {
std::lock_guard<std::mutex>lock(mtx_);
q_.push(t);
cond_.notify_all();
}
// dequeue a message (return.first == false if deq() was disabled)
std::pair<bool, T>deq() {
std::unique_lock<std::mutex>lock(mtx_);
cond_.wait(lock, [&]() {return !deq_enabled_ || !q_.empty();});
// if deq is disabled or queue is empty return
if (!deq_enabled_ || q_.empty()) {
return std::make_pair(false, T{});
}
// check if we have a message
std::pair<bool, T>ret{ std::make_pair(true,q_.front()) };
q_.pop();
return ret;
}
// cancel deq operations (will also release blocking threads)
void disable_deq(bool disable) {
std::unique_lock<std::mutex>lock(mtx_);
deq_enabled_ = !disable;
cond_.notify_all();
}
// check if queue is empty
bool empty()const {
std::unique_lock<std::mutex>lock(mtx_);
return q_.empty();
}
private:
mutable std::mutex mtx_;
mutable std::condition_variable cond_;
bool deq_enabled_ = true;
Container q_;
};
// forward decl
class queue_listener_impl;
template<typename Impl = queue_listener_impl>class basic_queue_listener_service;
// --- IO Object (used by client) -----------------------------
template<typename Service, typename Queue>
class basic_queue_listener :public boost::asio::basic_io_object<Service> {
public:
// ctor
explicit basic_queue_listener(boost::asio::io_service& io_service) :
boost::asio::basic_io_object<Service>(io_service) {
}
// async deq operation
template <typename Handler>
void async_deq(std::shared_ptr<Queue>q, Handler handler) {
// wace this->service.async_deq(this->implementation, q, handler);
this->get_service().async_deq(this->get_implementation(), q, handler);
}
};
// typedef for using standard service object
template<typename T>
using simple_queue_listener=basic_queue_listener<basic_queue_listener_service<>, simple_queue<T>>;
// --- service class -----------------------------
// (for one io_service, only one object created)
template<typename Impl>
class basic_queue_listener_service :public boost::asio::io_service::service {
public:
// required to have id of service
static boost::asio::io_service::id id;
// ctor
explicit basic_queue_listener_service(boost::asio::io_service& io_service) :
boost::asio::io_service::service(io_service) {
}
// dtor
~basic_queue_listener_service() {
}
// get a typedef for implementation
using implementation_type=std::shared_ptr<Impl>;
// mandatory (construct an implementation object)
void construct(implementation_type& impl) {
impl.reset(new Impl(
//this->get_io_service()
this->get_io_context()
));
}
// mandatory (destroy an implementation object)
void destroy(implementation_type& impl) {
impl.reset();
}
// async sync deq operation
template <typename Handler, typename Queue>
void async_deq(implementation_type& impl, std::shared_ptr<Queue>q, Handler handler) {
// this is a non-blocking operation so we are OK calling impl object in this thread
impl->async_deq(impl, q, handler);
}
private:
// shutdown service (required)
void shutdown_service() {
}
};
// definition of id of service (required)
template <typename Impl>
boost::asio::io_service::id basic_queue_listener_service<Impl>::id;
// --- implementation -----------------------------
class queue_listener_impl {
public:
// ctor (set up work queue for io_service so we don't bail out when executing run())
queue_listener_impl(boost::asio::io_service& post_io_service) :
impl_work_(new boost::asio::io_service::work(impl_io_service_)),
impl_thread_([&]() {impl_io_service_.run();}),
post_io_service_(post_io_service) {
}
// dtor (clear work queue, stop io service and join thread)
~queue_listener_impl() {
impl_work_.reset(nullptr);
impl_io_service_.stop();
if (impl_thread_.joinable())impl_thread_.join();
}
public:
// deque message (post request to thread)
template<typename Handler, typename Queue>
void async_deq(std::shared_ptr<queue_listener_impl>impl, std::shared_ptr<Queue>tq, Handler handler) {
impl_io_service_.post(deq_operation<Handler, Queue>(impl, post_io_service_, tq, handler));
}
private:
// function object calling blocking deq() on queue
template <typename Handler, typename Queue>
class deq_operation {
public:
// ctor
deq_operation(std::shared_ptr<queue_listener_impl>impl, boost::asio::io_service& io_service, std::shared_ptr<Queue>tq, Handler handler) :
wimpl_(impl), io_service_(io_service), work_(io_service), tq_(tq), handler_(handler) {
}
// function calling implementation object - runs in the thread created in ctor
void operator()() {
// make sure implementation object is still valid
std::shared_ptr<queue_listener_impl>impl{ wimpl_.lock() };
// if valid, go ahead and do blocking call on queue, otherwise post aborted message
if (impl) {
std::pair<bool, typename Queue::value_type>ret{ tq_->deq() };
boost::system::error_code ec = (!ret.first ? boost::asio::error::operation_aborted : boost::system::error_code());
this->io_service_.post(boost::asio::detail::bind_handler(handler_, ec, ret.second));
}
else {
// wacethis->io_service_.post(boost::asio::detail::bind_handler(handler_, boost::asio::error::operation_aborted, typename Queue::value_type()));
}
}
private:
std::weak_ptr<queue_listener_impl>wimpl_;
boost::asio::io_service& io_service_;
boost::asio::io_service::work work_;
std::shared_ptr<Queue>tq_;
Handler handler_;
};
// private data
boost::asio::io_service impl_io_service_;
std::unique_ptr<boost::asio::io_service::work>impl_work_;
std::thread impl_thread_;
boost::asio::io_service& post_io_service_;
};
}
}
#endif
Ошибка:
1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165 , 1): ошибка C2893: не удалось специализировать шаблон функции 'неизвестного типа std :: _ Binder, boost :: asio :: simple_queue >>>> *, std :: shared_ptr >>>>), const boost :: arg < 1> &, const boost :: arg <2> &, boost :: asio :: basic_queue_listener, boost :: asio :: simple_queue >>>> * &, std :: shared_ptr >>>> &> :: operator ( ) (_ Unbound && ...) const '1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): ошибка C2893: с 1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): ошибка C2893: [1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): ошибка C2893: T = std :: string, 1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): ошибка C2893: _Ty = std :: string 1> D: \ software \ libs \ подталкивание \ подталкивание \ ASIO \ подробно \ б ind_handler.hpp (165,1): ошибка C2893:] 1> D: \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): сообщение: со следующими аргументами шаблона: 1> D : \ software \ libs \ boost \ boost \ asio \ detail \ bind_handler.hpp (165,1): message: '_Unbound = {const Arg1 &, const Arg2 &}' 1> Закончен сборочный проект "asn1test_1.vcxproj" - НЕ СМОГЛИ. ========== Сборка: 0 успешных, 1 неудачных, 0 актуальных, 0 пропущенных ==========
К сожалению, мои знания о boost::asio
недостаточно глубоко, чтобы найти эту проблему ...