У меня есть следующий код
string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;
_solverManagementService.MergedPointCloudProducer(dataDirectory, cancellationToken)
.Subscribe(PointCloudMergerCompleted);
, где SolverManagementService _solverManagementService
равно
Public class SolverManagementService : ISolverManagementService
{
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(pairCollection =>
{
observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(
pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
... // Other methods.
}
Но здесь _icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token)
стоит дорого, и хотя это возвращает Task<IPointCloud>
Я не Threadify это и этот вызов блоков. Поскольку RecursivelyMergeAsync
возвращает Task<IPointCloud>
, его можно ожидать, поэтому я изменил код для использования async/await
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(async (pairCollection) =>
{
observer.OnNext(await _icpBatchSolverService.RecursivelyMergeAsync(
pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
, но теперь он немедленно возвращается и консольное приложение закрывается. Я уверен, что это можно сделать без необходимости Semephores
, но я новичок в RX. Как настроить одновременное выполнение RecursivelyMergeAsync
для каждого возвращенного pairCollection
без блокировки и получения уведомления о завершении всех рекурсивных слияний?
Примечание. В модульном тесте я делаю следующее
public class IcpBatchSolverServiceTests
{
private Mock<ISettingsProvider> _mockSettingsProvider;
private IIcpBatchSolverService _icpBatchSolverService;
[OneTimeSetUp]
public void Setup()
{
_mockSettingsProvider = new Mock<ISettingsProvider>();
_mockSettingsProvider.Setup(m => m.IcpSolverSettings).Returns(new IcpSolverSettings());
_mockSettingsProvider.Object.IcpSolverSettings.MaximumDegreeOfParallelism = 6;
Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();
var serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.SetMinimumLevel(LogLevel.Trace);
builder.AddSerilog(Log.Logger);
})
.BuildServiceProvider();
ILogger<IcpBatchSolverServiceTests> logger = serviceProvider
.GetService<ILoggerFactory>()
.CreateLogger<IcpBatchSolverServiceTests>();
_icpBatchSolverService = new IcpBatchSolverService(_mockSettingsProvider.Object, logger);
}
[Test]
public async Task CanSolveBatchAsync()
{
IPointCloud @static = PointCloudFactory.GetRandomPointCloud(1000);
List<IPointCloud> pointCloudList = PointCloudFactory.GenerateRandomlyRotatedBatch(@static, 12);
IPartitioningService<IPointCloud> ps = new PointCloudPartitioningService();
IPointCloud result = await _icpBatchSolverService.RecursivelyMergeAsync(ps.Partition(pointCloudList), CancellationToken.None);
Assert.AreEqual(@static.Vertices.Length, result.Vertices.Length);
}
}
И это обрабатывается одновременно идеально.
Редактировать. Описание того, какую обработку мне нужно выполнить, когда предоставляется папка с файлами для разных геометрий (карты глубины для разных геометрий под разными углами) с соглашением об именах .NNNN.exr, где NNNN - это какое-либо число c. Для пакета файлов.
- Пакет этих файлов в коллекции с использованием имени файла для различных геометрий.
foreach пакет файлов
- [* Serial *] Вызовите C ++ API для извлечения DepthMaps из файлов изображений.
- [* Parallel *] Конвертировать
DepthMaps
до PointClouds
. это можно сделать сразу. - [* Parallel *] Объединить PointClouds с использованием алгоритма ICP (дорого), но ограничить параллелизм с
TaskScheduler
двумя потоками (выбирается в зависимости от архитектуры машины / память et c.)
В конце этого я делаю еще один вызов C ++ API с облаком точек слияния из шага 3. Так что в RX мой текущий полный конвейер выглядит как
public class SolverManagementService : ISolverManagementService
{
private readonly IIcpBatchSolverService _icpBatchSolverService;
private readonly IDepthMapToPointCloudAdapter _pointCloudAdapter;
private readonly ILogger<SolverManagementService> _logger;
public SolverManagementService(
IIcpBatchSolverService icpBatchSolverService,
IDepthMapToPointCloudAdapter pointCloudAdapter,
ILogger<SolverManagementService> logger)
{
_icpBatchSolverService = icpBatchSolverService ?? throw new ArgumentNullException("icpBatchSolverService cannot be null");
_pointCloudAdapter = pointCloudAdapter ?? throw new ArgumentNullException("pointCloudAdapter cannot be null");
_logger = logger;
}
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(pairCollection =>
{
observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token).Result);
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
public IObservable<PairCollection<IPointCloud>> PairCollectionProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<PairCollection<IPointCloud>>(
observer =>
{
Parallel.ForEach(
Utils.GetFileBatches(dataDirectory),
(fileBatch) =>
{
var producer = RawDepthMapProducer(fileBatch, token);
ConcurrentBag<IPointCloud> bag = new ConcurrentBag<IPointCloud>();
producer.Subscribe(rawDepthMap =>
{
bag.Add(_pointCloudAdapter.GetPointCloudFromDepthMap(rawDepthMap));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: {bag.Count:N0} PointCloud(s) added to concurrent bag");
},
onCompleted: () =>
{
PointCloudPartitioningService ps = new PointCloudPartitioningService();
observer.OnNext(ps.Partition(bag.ToList()));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: PointCloud PairCollection generated " +
$"for file set \"{Path.GetFileNameWithoutExtension(bag.FirstOrDefault().Source)}\"");
});
});
observer.OnCompleted();
return () => { };
});
}
public IObservable<RawDepthMap> RawDepthMapProducer(List<string> filePaths, CancellationToken token)
{
return Observable.Create<RawDepthMap>(
observer =>
{
int index = 0;
foreach(var filePath in filePaths)
{
token.ThrowIfCancellationRequested();
var extractor = DepthMapExtractorFactory.GetDepthMapExtractor(filePath);
observer.OnNext(extractor.GetDepthMap(filePath, index++));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: DepthMap extracted from \"{filePath}\"");
}
observer.OnCompleted();
return () => { };
});
}
}
Я ищу: 1. Что не так с моим кодом выше примечание _icpBatchSolverService.RecursivelyMergeAsync
возвращает Task<IPointCloud
и является одновременным, и я хотел бы, чтобы эта буксировка выполнялась одновременно , 2. Что еще не так с моим кодом?