Дублирование данных по окнам для заполнения неопределенного количества мест - PullRequest
2 голосов
/ 13 июня 2019

Я пытаюсь дублировать записи данных в будущих окнах. Проблема, которую это решает, заключается в том, что вычисление статистики для каждого окна будет более точным, поскольку эти данные являются непрерывными (например, температура) и требуют базового значения.

На этих диаграммах каждый блок представляет собой фиксированное окно. Числа в каждом окне представляют данные, поступающие в коллекцию PC, поступающую из источника.

Это пример ввода PCollection:

+---------+---------+---------+--------->
| 1  2    |       3 |         |         |
+---------+---------+---------+--------->

И полученный вывод PCollection:

+---------+---------+---------+--------->
| 1  2    | 2     3 | 3       | 3       |
+---------+---------+---------+--------->

Обратите внимание, как самая последняя точка данных (на основе метки времени события) пересылается в следующее окно. Если имеется несколько пустых окон, значение необходимо переадресовать.

Я решил проблему переадресации один раз, запустив оконную PCollection через DoFn с состоянием, который испускает дополнительный дублированный и модифицированный элемент:

public class DupeFn extends DoFn<Datum, Datum> {
    @StateId("latest")
    private final StateSpec<ValueState<Datum>> latestStateSpec = StateSpecs.value();

    @TimerId("emit")
    private final TimerSpec emitSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement
    public void processElement(
            @Element Datum element,
            OutputReceiver<Datum> receiver,
            IntervalWindow window,
            @StateId("latest") ValueState<Datum> latest,
            @TimerId("emit") Timer emit
    ) {
        emit.set(window.maxTimestamp());

        Datum prev = latest.read();

        if (prev == null || element.timestamp > prev.timestamp) {
            latest.write(element);
        }

        receiver.output(element);
    }

    @OnTimer("emit")
    public void emitLatest(
            OutputReceiver<Datum> receiver,
            IntervalWindow window,
            @StateId("latest") ValueState<Datum> latest
    ) {
        Datum last = latest.read();

        // modify the timestamp such that it lands in the next window
        last.timestamp = window.end().getMillis() + 10;
        last.id += " DUPED";

        receiver.outputWithTimestamp(last, new Instant(last.timestamp));
    }
}

Проблема теперь в том, что ничто не будет дублироваться в исходное окно, если есть пустое окно. В идеале поведение должно быть таким, как описано на диаграмме выше.

Есть ли способ сделать это?

Редактировать
Я нашел это соответствующее неопубликованное сообщение в блоге.

1 Ответ

2 голосов
/ 14 июня 2019

В настоящее время существует небольшая проблема с документами Beam, как только это будет исправлено, блог должен появиться. Таймеры зацикливания дадут вам часть решения этой проблемы. Поскольку это обеспечит активность в каждом интервальном окне, даже при отсутствии данных.

Редактировать: блог теперь доступен здесь Ссылка на блог

Следующая часть требует использования Global Windows, что влечет за собой некоторые дополнительные сложности. Об этом будет рассказано на саммите Apache Beam на следующей неделе.

Берлинский саммит

Чтобы сохранить состояние, вам нужно перенаправить агрегаты с фиксированными окнами в GlobalWindow. Однако GlobalWindow не гарантирует порядок, поэтому вам нужно будет следовать чему-то вроде этого потока:

@ ProcessElement

@ OnTimer

  • Чтение и сортировка BaggedList по метке времени
  • Цепное окончательное значение от каждой агрегации до следующей агрегации, если у следующей агрегации нет значения (это было сделано с использованием таймера цикла, а не внешнего источника данных).
  • Вывести все значения, где отметка времени равна <затем OnTimer.Timestamp </li>
  • Очистите список всех уже обработанных элементов, обратите внимание, что сегодня это неэффективно, так как вы не можете удалить определенные элементы из списка. Если вы посмотрите на списки разработчиков в Apache Beam, то в будущем будет приятно обсудить запрос Sorted Map, который будет очень полезен здесь.

Извините, это не короткий ответ!

...