Как заархивировать / объединить по ключевым источникам, используя .net реактивный? - PullRequest
0 голосов
/ 27 января 2019

У меня есть много источников (число известно только во время выполнения) в списке.Все источники излучают элементы одного типа (Данные).Как сгруппировать их по ключу (currentDate), который является одним из его свойств?Затем мне нужно преобразовать их в один другой элемент (FullData), только если все источники излучают допустимый элемент данных.Таким образом, FullData генерируется, только если каждый источник генерирует действительные данные для определенного DateTime.

class Program
{
    static void Main(string[] args)
    {
        var rand = new Random();
        List<IObservable<Data>> sources = new List<IObservable<Data>>();

        //let's assume that value comes from a user
        var sourcesCounter = 4;
        for (int i = 0; i < sourcesCounter; i++)
        {
            sources.Add(
                Observable.Interval(TimeSpan.FromSeconds(1))
                    .Select(e => new Data
                    {
                        currentDate = DateTime.Now, //let's assume it is round to seconds
                        Samples = new List<double>(1000),
                        IsValid = rand.Next(5) < 4 //Generate true/false randomly
                    })

            );
        }

        var merged = sources.Merge();
        merged.Subscribe(
            e =>
            {
                Console.WriteLine($"received: {e.currentDate.Second} {e.IsValid}");
            },
            ex => Console.WriteLine(ex),
            () => Console.WriteLine("Completed - merged")
        );

        Console.ReadKey();
    }
}

public class Data
{
    public DateTime currentDate { get; set; }
    public bool IsValid { get; set; }
    public List<double> Samples { get; set; }
}

public class FullData
{
    public DateTime currentDate { get; set; }
    public List<List<double>> Samples { get; set; }
}

Ответы [ 2 ]

0 голосов
/ 28 января 2019

Приведенный ниже код группирует данные по currentDate, пока он не получит недопустимые данные для определенного ключа.

var keys = new ConcurrentDictionary<DateTime, DateTime>();
var dataDictionary = new ConcurrentDictionary<DateTime, FullData>();
sources
    .Merge()
    .GroupByUntil(data => data.currentDate, s => s.Any(data => !data.IsValid)) // group by currentDate until an invalid data appears in the group (the last invalid data can be in this group)
    .Where(g => keys.TryAdd(g.Key, g.Key)) // skip the reborned groups for the same key (they are created because of durationSelector, which controls the lifetime of a group)
    .Merge() // switch to the previous flattened structure
    .Where(data => data.IsValid) // remove the last invalid item emitted by GroupByUntil
    .Subscribe(x =>
    {
        var fullData = dataDictionary.GetOrAdd(x.currentDate, f => new FullData { currentDate = x.currentDate, Samples = new List<List<double>>() });
        fullData.Samples.Add(x.Samples);

        Console.WriteLine($"received: {x.currentDate.ToLocalTime()} {x.IsValid} {string.Join(", ", x.Samples)}");
    }, () => Console.WriteLine("Completed"));

Console.ReadKey();

foreach (var item in dataDictionary)
{
    Console.WriteLine($"{item.Key.ToLocalTime()}, {string.Join(",", item.Value.Samples.SelectMany(t => t))}");
}

Если вы знаете, что все наблюдаемые последовательности конечны, и вам нужносоздать FullData только если каждый источник генерирует только действительные данные , вы можете использовать другой подход:

sources.Merge().ToList().Subscribe(list =>
{
    var fullDataList = list
        .GroupBy(data => data.currentDate)
        .Where(g => g.All(data => data.IsValid))
        .Select(g => new FullData { currentDate = g.Key, Samples = g.Select(data => data.Samples).ToList() });
    foreach (var fullDataItem in fullDataList)
    {
        Console.WriteLine($"{fullDataItem.currentDate.ToLocalTime()}, {string.Join(",", fullDataItem.Samples.SelectMany(t => t))}");
    }
});

Приведенный выше код ожидает завершения всех наблюдаемых, создает список всехполученные элементы и, наконец, генерирует FullData с помощью простого запроса LINQ.

0 голосов
/ 27 января 2019

Существует перегрузка Zip, которая требует IEnumerable<IObservable<Data>>.

Попробуйте это:

var rand = new Random();

var sourcesCounter = 4;

IEnumerable<IObservable<Data>> sources =
    Enumerable
        .Range(0, sourcesCounter)
        .Select(x =>
            Observable
                .Interval(TimeSpan.FromSeconds(1))
                .Select(e => new Data()
                {
                    currentDate = DateTime.Now, //let's assume it is round to seconds
                    Samples = new List<double>(1000),
                    IsValid = rand.Next(5) < 4 //Generate true/false randomly
                }));

    IObservable<IList<Data>> zipped = sources.Zip(values => values);
...