Проблема, которую я пытаюсь решить, заключается в том, как правильно использовать Наблюдение за подпиской на несколько объектов в другом потоке, но поддерживать порядок, в котором они были запущены в исходном потоке.Субъекты запускаются в одном и том же потоке, я просто пытаюсь переместить их результаты во 2-й поток, сохраняя порядок относительно друг друга.
Я обнаружил, что подписка на один предмет икажется, что использование visible_on для перемещения результатов в другой поток сохраняет порядок, как и ожидалось.Тем не менее, когда два разных субъекта подписываются и направляют одного и того же работника / координатора событий через наблюдаемое, их порядок относительно друг друга, по-видимому, теряется.(по крайней мере, с настройками, которые я пробовал до сих пор)
Общая настройка:
- Создание нескольких предметов
- Подписаться на эти предметы и печатать, когда ониfired
- Использование приведения в соответствие наблюдаемого к одному работнику, чтобы обратные вызовы подписки на печать запускались в том же порядке, в каком они происходили в фоновом потоке, несмотря на перемещение в другой поток.
- Запуск субъектов нафоновый поток
Более конкретный пример (используя немного QT, но суть psuedocode должна быть понятна):
class TestThread : public QThread
{
public:
TestThread();
// Starts the thread
void run();
rxcpp::subjects::subject<int> spam_;
rxcpp::subjects::subject<bool> done_;
};
void TestThread::run() {
for (int i = 0; i < 1000; ++i) {
spam_.get_subscriber().on_next(i);
}
LOG("Firing finished");
done_.get_subscriber().on_next(true);
}
/// Main thread
// Create background thread
auto background_thread = new TestThread();
// Set up coordinator to run on one worker
rxcpp::schedulers::other_thread;
rxcpp::composite_subscription sub;
auto worker = other_thread.create_worker(sub);
auto coord = rxcpp::serialize_same_worker(worker );
thread_->spam_.get_observable()
.observe_on(coord)
.subscribe([=](int i)
{
LOG("Spam %i", i);
});
thread_->done_.get_observable()
.observe_on(coord)
.subscribe([=](bool)
{
LOG("DONE");
});
// Fire subjects on background thread
background_thread.start();
Результаты, которые я пытаюсьдля достижения будет выглядеть примерно так:
Spam 0
Spam 1
...
Firing finished // At some point the background thread is done with the loop, cool
...
Spam 45
...
Spam 999
DONE // After all spam events have fired, we see the DONE subject's event
// End of file
Вместо этого я вижу, что DONE чередуется и запускается рано:
Spam 0
Spam 1
...
Firing finished // At some point the background thread is done with the loop, cool
...
Spam 45
DONE // Oh no
Spam 46
...
Spam 999
// End of file
Приведенные выше результаты имеют смысл для меня, если spam_
и done_
субъекты запускались из разных потоков, поскольку блокировка очереди работника не обеспечивалась в надлежащем порядке;тот, кто получит мьютекс первым, будет в очереди следующий, что может быть не в порядке.
Однако в приведенном выше примере, поскольку оба субъекта запускаются из одного потока, я мог бы предположить, что все спам-события будут иметьпоставили в очередь на работника до того, как было выполнено событие done, так как между ними нет никакой нити.Поскольку работник выполнил события в планировщике, я бы тогда ожидал, что событие done_
всегда будет выполняться последним.
Вот соответствующая цитата из руководства разработчика, которая заставляет меня чувствовать, что работник долженуважать порядок: worker owns a queue of pending schedulables for the timeline and has a lifetime. When the time for an schedulable is reached the schedulable is run. The queue maintains insertion order so that when N schedulables have the same target time they are run in the order that they were inserted into the queue.
Есть ли что-то, что я неправильно понимаю в этой общей механике?Существует ли какая-либо настройка планировщика, которую я мог бы использовать для создания согласованной очереди для нескольких подписок на субъекты?
Код, который я разместил, является лишь одним из нескольких других маршрутов, которые я пробовал.До сих пор не было никакой комбинации для работы.
Заранее благодарим за любую помощь или понимание!8)