наблюдаем за использованием / сохраняемостью порядка по разным предметам - PullRequest
0 голосов
/ 03 июня 2019

Проблема, которую я пытаюсь решить, заключается в том, как правильно использовать Наблюдение за подпиской на несколько объектов в другом потоке, но поддерживать порядок, в котором они были запущены в исходном потоке.Субъекты запускаются в одном и том же потоке, я просто пытаюсь переместить их результаты во 2-й поток, сохраняя порядок относительно друг друга.

Я обнаружил, что подписка на один предмет икажется, что использование visible_on для перемещения результатов в другой поток сохраняет порядок, как и ожидалось.Тем не менее, когда два разных субъекта подписываются и направляют одного и того же работника / координатора событий через наблюдаемое, их порядок относительно друг друга, по-видимому, теряется.(по крайней мере, с настройками, которые я пробовал до сих пор)

Общая настройка:

  1. Создание нескольких предметов
  2. Подписаться на эти предметы и печатать, когда ониfired
  3. Использование приведения в соответствие наблюдаемого к одному работнику, чтобы обратные вызовы подписки на печать запускались в том же порядке, в каком они происходили в фоновом потоке, несмотря на перемещение в другой поток.
  4. Запуск субъектов нафоновый поток

Более конкретный пример (используя немного QT, но суть psuedocode должна быть понятна):

class TestThread : public QThread
{
public:
  TestThread();

  // Starts the thread
  void run();

  rxcpp::subjects::subject<int> spam_;
  rxcpp::subjects::subject<bool> done_;
};

void TestThread::run() {
  for (int i = 0; i < 1000; ++i) {
    spam_.get_subscriber().on_next(i);
  }
  LOG("Firing finished");
  done_.get_subscriber().on_next(true);
}
/// Main thread
// Create background thread
  auto background_thread = new TestThread();

// Set up coordinator to run on one worker
  rxcpp::schedulers::other_thread;
  rxcpp::composite_subscription sub;
  auto worker = other_thread.create_worker(sub);
  auto coord = rxcpp::serialize_same_worker(worker );

  thread_->spam_.get_observable()
    .observe_on(coord)
    .subscribe([=](int i)
    {
      LOG("Spam %i", i);
    });
  thread_->done_.get_observable()
    .observe_on(coord)
    .subscribe([=](bool)
    {
      LOG("DONE");
    });

// Fire subjects on background thread
background_thread.start();

Результаты, которые я пытаюсьдля достижения будет выглядеть примерно так:

Spam 0
Spam 1
...
Firing finished // At some point the background thread is done with the loop, cool
...
Spam 45
...
Spam 999
DONE // After all spam events have fired, we see the DONE subject's event
// End of file

Вместо этого я вижу, что DONE чередуется и запускается рано:

Spam 0
Spam 1
...
Firing finished // At some point the background thread is done with the loop, cool
...
Spam 45
DONE // Oh no
Spam 46
...
Spam 999
// End of file

Приведенные выше результаты имеют смысл для меня, если spam_ и done_ субъекты запускались из разных потоков, поскольку блокировка очереди работника не обеспечивалась в надлежащем порядке;тот, кто получит мьютекс первым, будет в очереди следующий, что может быть не в порядке.

Однако в приведенном выше примере, поскольку оба субъекта запускаются из одного потока, я мог бы предположить, что все спам-события будут иметьпоставили в очередь на работника до того, как было выполнено событие done, так как между ними нет никакой нити.Поскольку работник выполнил события в планировщике, я бы тогда ожидал, что событие done_ всегда будет выполняться последним.

Вот соответствующая цитата из руководства разработчика, которая заставляет меня чувствовать, что работник долженуважать порядок: worker owns a queue of pending schedulables for the timeline and has a lifetime. When the time for an schedulable is reached the schedulable is run. The queue maintains insertion order so that when N schedulables have the same target time they are run in the order that they were inserted into the queue.

Есть ли что-то, что я неправильно понимаю в этой общей механике?Существует ли какая-либо настройка планировщика, которую я мог бы использовать для создания согласованной очереди для нескольких подписок на субъекты?

Код, который я разместил, является лишь одним из нескольких других маршрутов, которые я пробовал.До сих пор не было никакой комбинации для работы.

Заранее благодарим за любую помощь или понимание!8)

...