Реактивный - как комбинировать / объединять / искать элементы с двумя последовательностями - PullRequest
0 голосов
/ 31 октября 2019

Я подключаюсь к веб-сервису, который дает мне все цены за день (без информации о времени). Каждый из этих ценовых результатов имеет идентификатор для соответствующего «пакетного прогона».

«Пакетный прогон» имеет отметку «дата + время», но мне нужно сделать отдельный вызов, чтобы получить всю информацию о пакете за день.

Следовательно, чтобы получить фактическое время каждого результата, мне нужно объединить два вызова API.

Я использую Reactive для этого, но я не могу надежно объединить два набораданных. Я думал, что CombineLatest сделает это, но, похоже, он не работает так, как я думал (основываясь на http://reactivex.io/documentation/operators/combinelatest.html, http://introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#CombineLatest).

    [TestMethod]
    public async Task EvenMoreBasicCombineLatestTest()
    {
        int batchStart = 100, batchCount = 10;

        //create 10 results with batch ids [100, 109]
        //the test uses lists just to make debugging easier
        var resultsWithBatchIdList = Enumerable.Range(batchStart, batchCount)
            .Select(id => new { BatchRunId = id, ResultValue = id * 10 })
            .ToList();
        var resultsWithBatchId = Observable.ToObservable(resultsWithBatchIdList);
        Assert.AreEqual(batchCount, await resultsWithBatchId.Count());

        //create 10 batches with ids [100, 109]
        var batchesList = Enumerable.Range(batchStart, batchCount)
            .Select(id => new
            {
                ThisId = id,
                BatchName = String.Concat("abcd", id)
            })
            .ToList();
        var batchesObservable = Observable.ToObservable(batchesList);
        Assert.AreEqual(batchCount, await batchesObservable.Count());

        //turn the batch set into a dictionary so we can look up each batch by its id
        var batchRunsByIdObservable = batchesObservable.ToDictionary(batch => batch.ThisId);

        //for each result, look up the corresponding batch id in the dictionary to join them together
        var resultsWithCorrespondingBatch =
            batchRunsByIdObservable
            .CombineLatest(resultsWithBatchId, (batchRunsById, result) =>
            {
                Assert.AreEqual(NumberOfResultsToCreate, batchRunsById.Count);

                var correspondingBatch = batchRunsById[result.BatchRunId];

                var priceResultAndSourceBatch = new 
                {
                    Result = result,
                    SourceBatchRun = correspondingBatch
                };
                return priceResultAndSourceBatch;
            });
        Assert.AreEqual(batchCount, await resultsWithCorrespondingBatch.Count());
    }

Я ожидаю, что каждыйЭлемент наблюдаемой «результаты» проходит, он объединяется с каждым элементом наблюдаемого словаря batch-id (который имеет только один элемент), но вместо этого он выглядит так, как будто присоединяется только последний элемент списка результатов.

Из-за этого у меня возникает более сложная проблема, но при попытке создать минимальное повторение даже это дает мне неожиданные результаты. Это происходит с версией 3.1.1, 4.0.0, 4.2.0 и т. Д.

(Обратите внимание, что последовательности обычно не совпадают, как в этом искусственном примере, поэтому я не могу просто Zip их.)

Так, как я могу сделать это объединение? Aпоток результатов, которые я хочу найти больше информации через словарь (который также исходит из Observable)?

Также обратите внимание, что цель состоит в том, чтобы вернуть IObservable (resultsWithCorrespondingBatch), поэтому я не могу простоawaitbatchRunsByIdObservable.

1 Ответ

1 голос
/ 31 октября 2019

Хорошо, думаю, я понял это. Хотелось бы, чтобы любая из двух мраморных диаграмм в документации была немного отлична - это сделало бы тонкость CombineLatest гораздо более очевидной:

N------1---2---3---
L--z--a------bc----

R------1---2-223---
       a   a bcc 

Это объединение последний - поэтому, в зависимости от того, когда предметы выбрасываются, можно пропустить несколько кортежей. То, что я должен был сделать, это SelectMany:

NO: .CombineLatest(resultsWithBatchId, (batchRunsById, result) =>
YES: .SelectMany(batchRunsById => resultsWithBatchId.Select(result =>

Обратите внимание, что порядок "соединения" важен: A.SelectMany (B) vs B.SelectMany (A) - если A имеет 1 элемент, а B имеет 100 элементов, последний вызовет 100 вызовов для подписки на A.

...