Сделать подписку IObservable одновременной - PullRequest
1 голос
/ 15 января 2020

У меня есть следующий код

string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;
_solverManagementService.MergedPointCloudProducer(dataDirectory, cancellationToken)
    .Subscribe(PointCloudMergerCompleted);

, где SolverManagementService _solverManagementService равно

Public class SolverManagementService : ISolverManagementService
{
    public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
        CancellationToken token)
    {
        return Observable.Create<IPointCloud>(
            observer =>
            {
                PairCollectionProducer(dataDirectory, token)
                    .Subscribe(pairCollection =>
                    {
                        observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(
                            pairCollection, token));
                    },
                    onCompleted: () =>
                    {
                        observer.OnCompleted();
                    });
                return () => { };
            });
    }
    ... // Other methods. 
}

Но здесь _icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token) стоит дорого, и хотя это возвращает Task<IPointCloud> Я не Threadify это и этот вызов блоков. Поскольку RecursivelyMergeAsync возвращает Task<IPointCloud>, его можно ожидать, поэтому я изменил код для использования async/await

public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
    CancellationToken token)
{
    return Observable.Create<IPointCloud>(
        observer =>
        {
            PairCollectionProducer(dataDirectory, token)
                .Subscribe(async (pairCollection) =>
                {
                    observer.OnNext(await _icpBatchSolverService.RecursivelyMergeAsync(
                        pairCollection, token));
                },
                onCompleted: () =>
                {
                    observer.OnCompleted();
                });
            return () => { };
        });
}

, но теперь он немедленно возвращается и консольное приложение закрывается. Я уверен, что это можно сделать без необходимости Semephores, но я новичок в RX. Как настроить одновременное выполнение RecursivelyMergeAsync для каждого возвращенного pairCollection без блокировки и получения уведомления о завершении всех рекурсивных слияний?

Примечание. В модульном тесте я делаю следующее

public class IcpBatchSolverServiceTests
{
    private Mock<ISettingsProvider> _mockSettingsProvider; 
    private IIcpBatchSolverService _icpBatchSolverService;

    [OneTimeSetUp]
    public void Setup()
    {
        _mockSettingsProvider = new Mock<ISettingsProvider>();

        _mockSettingsProvider.Setup(m => m.IcpSolverSettings).Returns(new IcpSolverSettings());
        _mockSettingsProvider.Object.IcpSolverSettings.MaximumDegreeOfParallelism = 6;

        Log.Logger = new LoggerConfiguration()
            .WriteTo.Console()
            .CreateLogger();

        var serviceProvider = new ServiceCollection()
            .AddLogging(builder =>
            {
                builder.SetMinimumLevel(LogLevel.Trace);
                builder.AddSerilog(Log.Logger);
            })
            .BuildServiceProvider();

        ILogger<IcpBatchSolverServiceTests> logger = serviceProvider
            .GetService<ILoggerFactory>()
            .CreateLogger<IcpBatchSolverServiceTests>();

        _icpBatchSolverService = new IcpBatchSolverService(_mockSettingsProvider.Object, logger);
    }

    [Test]
    public async Task CanSolveBatchAsync()
    {
        IPointCloud @static = PointCloudFactory.GetRandomPointCloud(1000);
        List<IPointCloud> pointCloudList = PointCloudFactory.GenerateRandomlyRotatedBatch(@static, 12);

        IPartitioningService<IPointCloud> ps = new PointCloudPartitioningService();
        IPointCloud result = await _icpBatchSolverService.RecursivelyMergeAsync(ps.Partition(pointCloudList), CancellationToken.None);

        Assert.AreEqual(@static.Vertices.Length, result.Vertices.Length);
    }
}

И это обрабатывается одновременно идеально.


Редактировать. Описание того, какую обработку мне нужно выполнить, когда предоставляется папка с файлами для разных геометрий (карты глубины для разных геометрий под разными углами) с соглашением об именах .NNNN.exr, где NNNN - это какое-либо число c. Для пакета файлов.

  1. Пакет этих файлов в коллекции с использованием имени файла для различных геометрий.

foreach пакет файлов

  1. [* Serial *] Вызовите C ++ API для извлечения DepthMaps из файлов изображений.
  2. [* Parallel *] Конвертировать DepthMaps до PointClouds. это можно сделать сразу.
  3. [* Parallel *] Объединить PointClouds с использованием алгоритма ICP (дорого), но ограничить параллелизм с TaskScheduler двумя потоками (выбирается в зависимости от архитектуры машины / память et c.)

В конце этого я делаю еще один вызов C ++ API с облаком точек слияния из шага 3. Так что в RX мой текущий полный конвейер выглядит как

public class SolverManagementService : ISolverManagementService
{
    private readonly IIcpBatchSolverService _icpBatchSolverService;
    private readonly IDepthMapToPointCloudAdapter _pointCloudAdapter;
    private readonly ILogger<SolverManagementService> _logger;

    public SolverManagementService(
        IIcpBatchSolverService icpBatchSolverService,
        IDepthMapToPointCloudAdapter pointCloudAdapter,
        ILogger<SolverManagementService> logger)
    {
        _icpBatchSolverService = icpBatchSolverService ?? throw new ArgumentNullException("icpBatchSolverService cannot be null");
        _pointCloudAdapter = pointCloudAdapter ?? throw new ArgumentNullException("pointCloudAdapter cannot be null");
        _logger = logger; 
    }

    public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
    {
        return Observable.Create<IPointCloud>(
            observer =>
            {
                PairCollectionProducer(dataDirectory, token)
                    .Subscribe(pairCollection =>
                    {
                        observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token).Result);
                    },
                    onCompleted: () =>
                    {
                        observer.OnCompleted();
                    });
                return () => { };
            });
    }

    public IObservable<PairCollection<IPointCloud>> PairCollectionProducer(string dataDirectory, CancellationToken token)
    {
        return Observable.Create<PairCollection<IPointCloud>>(
            observer =>
            {
                Parallel.ForEach(
                    Utils.GetFileBatches(dataDirectory), 
                    (fileBatch) =>
                {
                    var producer = RawDepthMapProducer(fileBatch, token);
                    ConcurrentBag<IPointCloud> bag = new ConcurrentBag<IPointCloud>();

                    producer.Subscribe(rawDepthMap =>
                    {
                        bag.Add(_pointCloudAdapter.GetPointCloudFromDepthMap(rawDepthMap));
                        _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: {bag.Count:N0} PointCloud(s) added to concurrent bag");
                    }, 
                    onCompleted: () =>
                    {
                        PointCloudPartitioningService ps = new PointCloudPartitioningService();
                        observer.OnNext(ps.Partition(bag.ToList()));

                        _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: PointCloud PairCollection generated " +
                            $"for file set \"{Path.GetFileNameWithoutExtension(bag.FirstOrDefault().Source)}\"");
                    });
                });
                observer.OnCompleted();
                return () => { };
            });
    }

    public IObservable<RawDepthMap> RawDepthMapProducer(List<string> filePaths, CancellationToken token)
    {
        return Observable.Create<RawDepthMap>(
            observer =>
            {
                int index = 0;
                foreach(var filePath in filePaths)
                {
                    token.ThrowIfCancellationRequested();
                    var extractor = DepthMapExtractorFactory.GetDepthMapExtractor(filePath);

                    observer.OnNext(extractor.GetDepthMap(filePath, index++));
                    _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: DepthMap extracted from \"{filePath}\"");
                }
                observer.OnCompleted();
                return () => { };
            });
    }
}

Я ищу: 1. Что не так с моим кодом выше примечание _icpBatchSolverService.RecursivelyMergeAsync возвращает Task<IPointCloud и является одновременным, и я хотел бы, чтобы эта буксировка выполнялась одновременно , 2. Что еще не так с моим кодом?

1 Ответ

4 голосов
/ 15 января 2020

Я собираюсь оставить обобщенный c ответ, потому что приведенный выше код слишком велик, чтобы сводить его к нулю.

Существует два синтаксиса, которые могут использоваться для определения асинхронного поведения. Первый - это шаблон async/await, а второй и более старый - шаблон Subscribe() (реактивный).

Является ли асинхронный тем же, что и параллельный?

Нет, это точно не так. Для тех, кто может читать это, кто не знает, асинхронный означает «это происходит позже», а не «это происходит одновременно». Используя любой из этих синтаксисов, вы определяете поведение, которое происходит сразу после того, как какой-то предикат был встречен. Очень распространенным вариантом использования является обработка ответа, возвращаемого с веб-сервера. Вам нужно сделать запрос, а затем сделать что-нибудь, когда ответ вернется.

Параллельность отличается. Вы можете вызвать параллелизм, например, используя Task.Run() или Parallel.ForEach(). В обоих случаях вы определяете вилку. В случае Task.Run вы можете затем сделать Task.WaitAll. В случае Parallel.ForEach, он сделает форк / соединение за вас. Конечно, у реактивного есть свой собственный набор операций разветвления / соединения.

Что происходит, когда я жду или подписываюсь?

Следующие две строки кода имеют одинаковые значения поведение, и это поведение сбивает с толку большое количество программистов:

var result = await myAsync();

myObservable.Subscribe(result => { ... });

В обоих случаях поток управления программы движется в предсказуемой , но потенциально запутанная мода. В первом случае поток управления возвращается обратно к родительскому вызывающему, пока ожидается await. Во втором поток управления переходит к следующей строке кода с лямбда-выражением, которое вызывается при возврате результата.

Обычная вещь, которую я видел среди людей, изучающих, как их использовать, это попытаться назначить переменную из лямбда-адреса для адреса в родительской области. Это не сработает, потому что эта область перестанет существовать задолго до выполнения лямбды. С меньшей вероятностью сделать что-то глупое с использованием async/await, но вы также должны помнить, что поток управления будет go увеличивать стек вызовов до тех пор, пока не будет определена следующая синхронная операция. Эта статья объясняет это немного глубже , а эту статью немного легче понять.

...