Как синхронизировать два конвейера TPL DataFlow? - PullRequest
1 голос
/ 22 апреля 2019

Я хочу написать приложение, которое оценивает данные датчика от двух датчиков.Оба датчика отправляют свои данные в Package объектах, которые разбиты на Frame объектов.A Package - это, по существу, Tuple<Timestamp, Data[]>, Frame - это Tuple<Timestamp, Data>.Тогда мне нужно всегда использовать Frame с самой ранней отметкой времени из обоих источников.

Так что в основном мой объектный поток

Package -(1:n)-> Frame \
                        }-pair synchronized-> Tuple<Frame, Frame>
Package -(1:n)-> Frame /

Пример

Предположимкаждый Package содержит 2 или 3 значения (реальность: 5-7) и целочисленные временные метки, которые увеличиваются на 1 (реальность: ~ 200 Гц => ~ 5 мс)."Данные" просто timestamp * 100 ради простоты.

Packages (timestamp, values[])

Source 1:
{(19, [1700, 1800, 1900]), (22, [2000, 2100, 2200]), (26, [2500, 2600]),
 (29, [2700, 2800, 2900]), ...}

Source 2:
{(17, [1500, 1600, 1700]), (19, [1800, 1900]), (21, [2000, 2100]),
 (26, [2400, 2500, 2600]), ...}

После (1:n) шагов:

Frames (timestamp, value)

Source 1:
{(17, 1700), (18, 1800), (19, 1900), (20, 2000), (21, 2100),
 (22, 2200), (25, 2500), (26, 2600), (27, 2700), (28, 2800),
 (29, 2900), ...}

Source 2:
{(15, 1500), (16, 1600), (17, 1700), (18, 1800), (19, 1900),
 (20, 2000), (21, 2100), (24, 2400), (25, 2500), (26, 2600), ...}

После шага pair synchronized:

Merged tuples (timestamp, source1, source2)

{(15, null, 1500), (16, null, 1600), (17, 1700, 1700), (18, 1800, 1800),
 (19, 1900, 1900), (20, 2000, 2000), (21, 2100, 2100), (22, 2200, null),
 (24, null, 2400), (25, 2500, 2500), (26, 2600, 2600), ...}

Обратите внимание, что отметка времени 23 отсутствует, поскольку none обоих источников отправили значение.Это просто побочный эффект.Я могу вставить пустой кортеж или нет, не имеет значения.Также не имеет значения, является ли кортеж (27, 2700, 2700) или ((27, 2700), (27, 2700)), то есть Tuple<Timestamp, Data, Data> или Tuple<Frame, Frame>.


Я почти уверен, что часть (1:n) должна быть TransformManyBlock<Package, Frame>, если я правильно понял документацию.

Но какой блок я использую для pair synchronized части? Сначала яЯ думал, что JoinBlock<Frame, Frame> будет то, что я искал, но, похоже, это просто пары двух элементов по индексу.Но поскольку не гарантируется, что оба конвейера запускаются с одной и той же временной меткой, и что оба конвейера всегда будут генерировать устойчивый поток непрерывных временных меток (поскольку иногда пакеты с несколькими кадрами могут быть потеряны при передаче), это не вариант.Так что мне нужно больше «MergeBlock» с возможностью решить, какой элемент обоих входных потоков будет распространяться на выход далее (если есть).

Я подумал, что мне придется написать что-то вроде этогосебя.Но у меня проблемы с написанием кода, который правильно обрабатывает две переменные ISourceBlock и одну переменную ITargetBlock.Я в основном застрял как можно раньше:

private void MergeSynchronized(
    ISourceBlock<Frame> source1,
    ISourceBlock<Frame> source2,
    ITargetBlock<Tuple<Frame, Frame>> target)
{
  var frame1 = source1.Receive();
  var frame2 = source2.Receive();

  //Loop {
  //  Depending on the timestamp [mis]match,
  //  either pair frame1+frame2 or frame1+null or null+frame2, and
  //  replace whichever frame(s) was/were propagated already
  //  with the next frame from the respective pipeline
  //}
}

Я даже не уверен насчет этого черновика: должен ли метод быть async, чтобы я мог использовать var frame1 = await source1.ReceiveAsnyc();?Каково состояние цикла?Где и как проверить завершение?Как решить очевидную проблему, заключающуюся в том, что мой код означает, что я должен ждать, пока разрыв в потоке не станет сверх , чтобы понять, что разрыв существует?

Альтернатива, о которой я подумал, - добавитьдополнительный блок в конвейерах, гарантирующий, что в конвейер помещается достаточное количество «дозорных кадров» для каждого датчика, так что выравнивание всегда первого из каждого конвейера будет выравнивать правильные два.Я догадываюсь , что будет своего рода TransformManyBlock, который читает кадр, сравнивает «ожидаемую» временную метку с фактической временной меткой, а затем вставляет сторожевые кадры для отсутствующих временных меток, пока временная метка фрейма снова не станет правильной.

Или часть pair synchronized предназначена для остановки с объектами потока данных TPL и запуска фактического кода, который уже работает с частью Data?

...