Я использую TDF для своего приложения, которое до сих пор прекрасно работает, к сожалению, я наткнулся на конкретную проблему, которая, кажется, не может быть обработана напрямую с существующими механизмами потока данных:
У меня есть N производителей (в данном случае BufferBlocks), которые связаны только с 1 (все с тем же) ActionBlock. Этот блок всегда обрабатывает 1 элемент за раз, а также имеет емкость только для 1 элемента.
К ссылке от производителей на ActionBlock я также хочу добавить фильтр, но особый случай здесь заключается в том, что условие фильтра может изменяться независимо от обработанного элемента, и элемент не должен быть отброшен!
Поэтому я хочу обработать все элементы, но порядок и время могут измениться, когда элемент будет обработан.
К сожалению, я узнал, что если элемент «отклоняется» один раз -> условие фильтра оценивает false, и если этот элемент не передается в другой блок (например, NullTarget), целевой блок не повторяет тот же элемент (и не переоценивает фильтр).
public class ConsumeTest
{
private readonly BufferBlock<int> m_bufferBlock1;
private readonly BufferBlock<int> m_bufferBlock2;
private readonly ActionBlock<int> m_actionBlock;
public ConsumeTest()
{
m_bufferBlock1 = new BufferBlock<int>();
m_bufferBlock2 = new BufferBlock<int>();
var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 };
m_actionBlock = new ActionBlock<int>((item) => BlockAction(item), options);
var start = DateTime.Now;
var elapsed = TimeSpan.FromMinutes(1);
m_bufferBlock1.LinkTo(m_actionBlock, x => IsTimeElapsed(start, elapsed));
m_bufferBlock2.LinkTo(m_actionBlock);
FillBuffers();
}
private void BlockAction(int item)
{
Console.WriteLine(item);
Thread.Sleep(2000);
}
private void FillBuffers()
{
for (int i = 0; i < 1000; i++)
{
if (i % 2 == 0)
{
m_bufferBlock1.Post(i);
}
else
{
m_bufferBlock2.Post(i);
}
}
}
private bool IsTimeElapsed(DateTime start, TimeSpan elapsed)
{
Console.WriteLine("checking time elapsed");
return DateTime.Now > (start + elapsed);
}
public async Task Start()
{
await m_actionBlock.Completion;
}
}
Код устанавливает конвейер тестирования и заполняет два буфера нечетными и четными числами. Оба BufferBlocks подключены к одному ActionBlock, который печатает только «обработанное» число и ждет 2 секунды.
Условие фильтра между m_bufferBlock1 и m_actionBlock проверяет (для целей тестирования), истекла ли 1 минута с тех пор, как мы все это начали.
Если мы запустим это, он выдаст следующий вывод:
1
checking time elapsed
3
5
7
9
11
13
15
17
19
Как мы видим, ActionBlock берет первый элемент из BufferBlock без фильтра, а затем пытается извлечь элемент из BufferBlock с фильтром. Фильтр оценивает false и продолжает отбирать все элементы из блока без фильтра.
Я ожидал, что после того, как элемент из BufferBlock без фильтра будет обработан, он снова попытается извлечь элемент из другого BufferBlock с фильтром, оценивая его снова.
Это будет мой ожидаемый (или желаемый) результат:
1
checking time elapsed
3
checking time elapsed
5
checking time elapsed
7
checking time elapsed
9
checking time elapsed
11
checking time elapsed
13
checking time elapsed
15
// after timer has elapsed take elements also from other buffer
2
17
4
19
Мой вопрос сейчас таков: есть ли способ «сбросить» уже «отклоненное» сообщение, чтобы оно снова оценивалось, или есть другой способ, моделируя его по-другому? В общих чертах, НЕ важно, чтобы они действительно вытягивались из обоих буферов, строго чередуясь! (потому что я знаю, что это зависит от расписания, и вполне нормально, если время от времени снимаются с очереди 2 элемента из одного блока)
Но важно, чтобы «отклоненное» сообщение не было отброшено или помещено в очередь, поскольку важен порядок в одном буфере.
Заранее спасибо