Apache Beam Windowing на фазе сигналов - PullRequest
1 голос
/ 05 июня 2019

Обновлено : возможно ли окно потока данных на фазе сигналов .

Например, есть поток метки времени, ключа, значения:

[<t0, k1, 0>, <t1, k1, 98>, <t2, k1, 145>, <t4, k1, 0>, <t3, k1, 350>, <t5, k1, 40>, <t6, k1, 65>, <t7, k1, 120>, <t8, k1, 240>, <t9, k1, 352>].

Для ключа k1 будет выведено два окна:

  • t0 - t3: [0, 98, 145, 350]
  • t4 - t9: [0, 40, 65, 120, 240, 352]

Например, каждый раз, когда значение достигает 0, начинайте новое окно для группы.

Ответы [ 2 ]

0 голосов
/ 09 июня 2019

После того, как ваш вопрос отредактирован и уточнен регистр, я бы порекомендовал изучить пользовательские окна, чтобы расширить стандартные сессии .В качестве отправной точки я построил следующий пример (он может быть улучшен).

Через WindowFn.AssignContext мы можем получить доступ к element(), который он превращает в окно в протосеанс.Если оно равно заданному stopValue, то длина окна будет ограничена до минимума вместо использования gapDuration для этой цели:

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
  Duration newGap = c.element().getValue().equals(this.stopValue) ? new Duration(1) : gapDuration;
  return Arrays.asList(new IntervalWindow(c.timestamp(), newGap));
}

Затем при объединении отсортированных окон мы проверим, если ониперекрытия, но также и то, что длительность окна не равна 1 мс.

Collections.sort(sortedWindows);
List<MergeCandidate> merges = new ArrayList<>();
MergeCandidate current = new MergeCandidate();
for (IntervalWindow window : sortedWindows) {
  // get window duration and check if it's a stop session request
  Long windowDuration = new Duration(window.start(), window.end()).getMillis();

  if (current.intersects(window) && !windowDuration.equals(1L)) {
    current.add(window);
  } else {
    merges.add(current);
    current = new MergeCandidate(window);
  }
}
merges.add(current);
for (MergeCandidate merge : merges) {
  merge.apply(c);
}

Конечно, мы также можем добавить некоторый код, чтобы мы могли предоставить различные значения остановки: поле stopValue, поле withStopValue метод, конструкторы, отображение данных при использовании Dataflow Runner и т. Д.

/** Value that closes the session. */
private final Integer stopValue;

/** Creates a {@code StopSessions} {@link WindowFn} with the specified gap duration. */
public static StopSessions withGapDuration(Duration gapDuration) {
  return new StopSessions(gapDuration, 0);
}

/** Creates a {@code StopSessions} {@link WindowFn} with the specified stop value. */
public StopSessions withStopValue(Integer stopValue) {
  return new StopSessions(gapDuration, stopValue);
}

/** Creates a {@code StopSessions} {@link WindowFn} with the specified gap duration and stop value. */
private StopSessions(Duration gapDuration, Integer stopValue) {
  this.gapDuration = gapDuration;
  this.stopValue = stopValue;

Теперь в нашем конвейере мы можем импортировать и использовать новый класс StopSessions с:

import org.apache.beam.sdk.transforms.windowing.StopSessions; // custom one
...

.apply("Window into StopSessions", Window.<KV<String, Integer>>into(StopSessions
  .withGapDuration(Duration.standardSeconds(10))
  .withStopValue(0)))

Чтобы подражать вашему примеру, мы создаем некоторые данные с помощью:

.apply("Create data", Create.timestamped(
    TimestampedValue.of(KV.of("k1", 0), new Instant()), // <t0, k1, 0>
    TimestampedValue.of(KV.of("k1",98), new Instant().plus(1000)), // <t1, k1, 98>
    TimestampedValue.of(KV.of("k1",145), new Instant().plus(2000)), // <t2, k1, 145>
    TimestampedValue.of(KV.of("k1",0), new Instant().plus(4000)), // <t4, k1, 0>
    ...

При стандартных сеансах вывод будет:

user=k1, scores=[0,145,350,120,0,40,65,98,240,352], window=[2019-06-08T19:13:46.785Z..2019-06-08T19:14:05.797Z)

А с пользовательским я получаю следующее:

user=k1, scores=[350,145,98], window=[2019-06-08T21:18:51.395Z..2019-06-08T21:19:03.407Z)
user=k1, scores=[0], window=[2019-06-08T21:18:54.407Z..2019-06-08T21:18:54.408Z)
user=k1, scores=[65,240,352,120,40], window=[2019-06-08T21:18:55.407Z..2019-06-08T21:19:09.407Z)
user=k1, scores=[0], window=[2019-06-08T21:18:50.395Z..2019-06-08T21:18:50.396Z)

Изменение stopValue на .withStopValue(<int>) работает как положено.События 98, 145 and 350 находятся в другом сеансе, чем остальные.Обратите внимание, что это не совсем то же самое, что и в описании, поскольку stopValue назначается отдельному окну вместо нового, но его можно отфильтровать вниз по течению, и это дает вам представление о том, как действовать.Я хотел бы вернуться к этому и также искать реализацию Python.

Все файлы здесь .

0 голосов
/ 05 июня 2019

Скорее всего нет, из вашего описания. Есть как минимум две проблемы:

  • PCollections в Beam неупорядочены и распределены:

    • в модели нет гарантий, что события из одной группы будут поступать в этом порядке;
  • триггеры, управляемые данными, не поддерживаются (возможно, по аналогичным причинам):

Однако вы можете посмотреть на обработку с сохранением состояния и посмотреть, сможете ли вы справиться с этим вручную. Например. Вы накапливаете все входящие события в состоянии, а затем время от времени анализируете накопленные события и выводите результаты.

Или, если вы можете извлечь / назначить общий ключ в своей бизнес-логике, то вы можете проверить, будут ли полезны GroupByKey+ParDo или Combine.

См:

...