Получите несколько IEnumerables - PullRequest
2 голосов
/ 05 августа 2011

У меня есть кусок кода, который выполняет расчеты по активам.Их много миллионов, поэтому я хочу вычислить все в потоках.Мой текущий «конвейер» выглядит так:

У меня есть запрос, который выполняется как Datareader.

Тогда в моем классе активов есть конструктор, который принимает IDataReader;

Public Asset(IdataReader rdr){
  // logic that initiates fields
}

, и метод, который преобразует IDataReader в IEnumerable

public static IEnumerable<Asset> ToAssets(IDataReader rdr) {

    // make sure the reader is in the right formt
    CheckReaderFormat(rdr);

    // project reader into IEnumeable<Asset>
    while (rdr.Read()) yield return new Asset(rdr);

}

Этозатем передается в функцию, которая выполняет фактические вычисления, а затем проецирует ее в IEnumerable

, который затем получает обертку, выставляет ответы в качестве IDataReader и затем передается в OracleBulkCopy ипоток записывается в БД.

Пока что это работает как шарм.Из-за настройки я могу поменять DataReader на IEnumerable, который читает из файла, или записать результаты в файл и т. Д. Все в зависимости от того, как я связываю классы / функции вместе.

Теперь: есть несколько вещей, которые я могу вычислить, например, помимо обычного ответа, у меня может быть класс DebugAnswer, который также выводит некоторые промежуточные числа для отладки.Итак, я хотел бы проецировать IEnumerable в несколько потоков вывода, чтобы я мог добавить к ним «слушателей».Таким образом, мне не придется просматривать данные несколько раз.Как я могу это сделать?Вроде как иметь несколько Событий, а затем запускать определенный код, только если подключены слушатели.

Также иногда я пишу в БД, а также в zip-файл, чтобы сохранить резервную копию результатов.Итак, я хотел бы иметь 2 'слушателей' на IEnumerable.Один, который проецируется как IDataReader, а другой, который записывает прямо в файл.

Как вывести несколько выходных потоков и как разместить несколько прослушивателей в одном выходном потоке?Что позволяет мне составлять потоки таких данных?

edit

поэтому какой-то псевдокод того, что я хотел бы сделать:

foreach(Asset in Assets){
   if(DebugListener != null){
     // compute 
     DebugAnswer da = new DebugAnswer {result = 100};
     yield da to DebugListener;  // so instead of yield return yield to that stream

   }

   if(AnswerListener != null){
     // compute basic stuff 
     Answer a = new Answer { bla = 200 };
     yield a to AnswerListener;
   }
}

Заранее спасибо,

Gert* 1034-январь *

Ответы [ 5 ]

5 голосов
/ 05 августа 2011

То, что вы описываете, звучит примерно как то, что React Framework предоставляет через интерфейс IObservable, но я не знаю точно, позволяет ли он нескольким подписчикам в одном потоке подписки.

Обновление

Если вы посмотрите документацию для IObservable, у нее есть довольно хороший пример того, как вы делаете то, что делаете, с несколькими подписчиками на один объект.

4 голосов
/ 05 августа 2011

Ваш пример переписан с использованием Rx :

// The stream of assets
IObservable<Asset> assets = ...

// The stream of each asset projected to a DebugAnswer
IObservable<DebugAnswer> debugAnswers = from asset in assets
                                        select new DebugAnswer { result = 100 };

// Subscribe the DebugListener to receive the debugAnswers
debugAnswers.Subscribe(DebugListener);

// The stream of each asset projected to an Anwer
IObservable<Answer> answers = from asset in assets
                              select new Answer { bla = 200 };

// Subscribe the AnswerListener to receive the answers
answers.Subscribe(AnswerListener);
1 голос
/ 05 августа 2011

Вам не нужно несколько «слушателей», вам просто нужны компоненты конвейера, которые не являются деструктивными или даже обязательно трансформируемыми.

IEnumerable<T> PassThroughEnumerable<T>(IEnumerable<T> source, Action<T> action) {
    foreach (T t in source) {
       Action(t);
       yield return t;
    }    
}

Или, так как вы обрабатываете в конвейере, просто вызовите некоторые события для использования. Вы можете асинхронизировать их, если хотите:

static IEnumerable<Asset> ToAssets(IDataReader rdr) {
   CheckReaderFormat(rdr);
   var h = this.DebugAsset;
   while (rdr.Read()) {
      var a = new Asset(rdr);
      if (h != null) h(a);
      yield return a;
   }
}

public event EventHandler<Asset> DebugAsset;
1 голос
/ 05 августа 2011

Это в точности работа для Реактивных расширений (стала частью .NET с 4.0, доступна как библиотека в 3.5).

0 голосов
/ 05 августа 2011

Если я вас правильно понял, должна быть возможность заменить или украсить обертку. WrapperDecorator может переадресовывать вызовы на обычный OracleBulkCopy (или что вы используете) и добавлять некоторый пользовательский код отладки.

Это тебе помогает?

Matthias

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...