Как сбросить отложенное / отклоненное сообщение в потоке данных TPL - PullRequest
1 голос
/ 24 апреля 2019

Я использую TDF для своего приложения, которое до сих пор прекрасно работает, к сожалению, я наткнулся на конкретную проблему, которая, кажется, не может быть обработана напрямую с существующими механизмами потока данных:

У меня есть N производителей (в данном случае BufferBlocks), которые связаны только с 1 (все с тем же) ActionBlock. Этот блок всегда обрабатывает 1 элемент за раз, а также имеет емкость только для 1 элемента.

К ссылке от производителей на ActionBlock я также хочу добавить фильтр, но особый случай здесь заключается в том, что условие фильтра может изменяться независимо от обработанного элемента, и элемент не должен быть отброшен! Поэтому я хочу обработать все элементы, но порядок и время могут измениться, когда элемент будет обработан.

К сожалению, я узнал, что если элемент «отклоняется» один раз -> условие фильтра оценивает false, и если этот элемент не передается в другой блок (например, NullTarget), целевой блок не повторяет тот же элемент (и не переоценивает фильтр).

public class ConsumeTest
  {
    private readonly BufferBlock<int> m_bufferBlock1;
    private readonly BufferBlock<int> m_bufferBlock2;
    private readonly ActionBlock<int> m_actionBlock;

    public ConsumeTest()
    {
      m_bufferBlock1 = new BufferBlock<int>();
      m_bufferBlock2 = new BufferBlock<int>();

      var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 };
      m_actionBlock = new ActionBlock<int>((item) => BlockAction(item), options);

      var start = DateTime.Now;
      var elapsed = TimeSpan.FromMinutes(1);

      m_bufferBlock1.LinkTo(m_actionBlock, x => IsTimeElapsed(start, elapsed));
      m_bufferBlock2.LinkTo(m_actionBlock);

      FillBuffers();
    }

    private void BlockAction(int item)
    {
      Console.WriteLine(item);
      Thread.Sleep(2000);
    }

    private void FillBuffers()
    {
      for (int i = 0; i < 1000; i++)
      {
        if (i % 2 == 0)
        {
          m_bufferBlock1.Post(i);
        }
        else
        {
          m_bufferBlock2.Post(i);
        }
      }
    }

    private bool IsTimeElapsed(DateTime start, TimeSpan elapsed)
    {
      Console.WriteLine("checking time elapsed");
      return DateTime.Now > (start + elapsed);
    }

    public async Task Start()
    {
      await m_actionBlock.Completion;
    }
  }

Код устанавливает конвейер тестирования и заполняет два буфера нечетными и четными числами. Оба BufferBlocks подключены к одному ActionBlock, который печатает только «обработанное» число и ждет 2 секунды.

Условие фильтра между m_bufferBlock1 и m_actionBlock проверяет (для целей тестирования), истекла ли 1 минута с тех пор, как мы все это начали.

Если мы запустим это, он выдаст следующий вывод:

1
checking time elapsed
3
5
7
9
11
13
15
17
19

Как мы видим, ActionBlock берет первый элемент из BufferBlock без фильтра, а затем пытается извлечь элемент из BufferBlock с фильтром. Фильтр оценивает false и продолжает отбирать все элементы из блока без фильтра.

Я ожидал, что после того, как элемент из BufferBlock без фильтра будет обработан, он снова попытается извлечь элемент из другого BufferBlock с фильтром, оценивая его снова.

Это будет мой ожидаемый (или желаемый) результат:

1
checking time elapsed
3
checking time elapsed
5
checking time elapsed
7
checking time elapsed
9
checking time elapsed
11
checking time elapsed
13
checking time elapsed
15
// after timer has elapsed take elements also from other buffer
2
17
4
19

Мой вопрос сейчас таков: есть ли способ «сбросить» уже «отклоненное» сообщение, чтобы оно снова оценивалось, или есть другой способ, моделируя его по-другому? В общих чертах, НЕ важно, чтобы они действительно вытягивались из обоих буферов, строго чередуясь! (потому что я знаю, что это зависит от расписания, и вполне нормально, если время от времени снимаются с очереди 2 элемента из одного блока) Но важно, чтобы «отклоненное» сообщение не было отброшено или помещено в очередь, поскольку важен порядок в одном буфере.

Заранее спасибо

...