После того, как ваш вопрос отредактирован и уточнен регистр, я бы порекомендовал изучить пользовательские окна, чтобы расширить стандартные сессии .В качестве отправной точки я построил следующий пример (он может быть улучшен).
Через WindowFn.AssignContext
мы можем получить доступ к element()
, который он превращает в окно в протосеанс.Если оно равно заданному stopValue
, то длина окна будет ограничена до минимума вместо использования gapDuration
для этой цели:
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
Duration newGap = c.element().getValue().equals(this.stopValue) ? new Duration(1) : gapDuration;
return Arrays.asList(new IntervalWindow(c.timestamp(), newGap));
}
Затем при объединении отсортированных окон мы проверим, если ониперекрытия, но также и то, что длительность окна не равна 1 мс.
Collections.sort(sortedWindows);
List<MergeCandidate> merges = new ArrayList<>();
MergeCandidate current = new MergeCandidate();
for (IntervalWindow window : sortedWindows) {
// get window duration and check if it's a stop session request
Long windowDuration = new Duration(window.start(), window.end()).getMillis();
if (current.intersects(window) && !windowDuration.equals(1L)) {
current.add(window);
} else {
merges.add(current);
current = new MergeCandidate(window);
}
}
merges.add(current);
for (MergeCandidate merge : merges) {
merge.apply(c);
}
Конечно, мы также можем добавить некоторый код, чтобы мы могли предоставить различные значения остановки: поле stopValue
, поле withStopValue
метод, конструкторы, отображение данных при использовании Dataflow Runner и т. Д.
/** Value that closes the session. */
private final Integer stopValue;
/** Creates a {@code StopSessions} {@link WindowFn} with the specified gap duration. */
public static StopSessions withGapDuration(Duration gapDuration) {
return new StopSessions(gapDuration, 0);
}
/** Creates a {@code StopSessions} {@link WindowFn} with the specified stop value. */
public StopSessions withStopValue(Integer stopValue) {
return new StopSessions(gapDuration, stopValue);
}
/** Creates a {@code StopSessions} {@link WindowFn} with the specified gap duration and stop value. */
private StopSessions(Duration gapDuration, Integer stopValue) {
this.gapDuration = gapDuration;
this.stopValue = stopValue;
Теперь в нашем конвейере мы можем импортировать и использовать новый класс StopSessions
с:
import org.apache.beam.sdk.transforms.windowing.StopSessions; // custom one
...
.apply("Window into StopSessions", Window.<KV<String, Integer>>into(StopSessions
.withGapDuration(Duration.standardSeconds(10))
.withStopValue(0)))
Чтобы подражать вашему примеру, мы создаем некоторые данные с помощью:
.apply("Create data", Create.timestamped(
TimestampedValue.of(KV.of("k1", 0), new Instant()), // <t0, k1, 0>
TimestampedValue.of(KV.of("k1",98), new Instant().plus(1000)), // <t1, k1, 98>
TimestampedValue.of(KV.of("k1",145), new Instant().plus(2000)), // <t2, k1, 145>
TimestampedValue.of(KV.of("k1",0), new Instant().plus(4000)), // <t4, k1, 0>
...
При стандартных сеансах вывод будет:
user=k1, scores=[0,145,350,120,0,40,65,98,240,352], window=[2019-06-08T19:13:46.785Z..2019-06-08T19:14:05.797Z)
А с пользовательским я получаю следующее:
user=k1, scores=[350,145,98], window=[2019-06-08T21:18:51.395Z..2019-06-08T21:19:03.407Z)
user=k1, scores=[0], window=[2019-06-08T21:18:54.407Z..2019-06-08T21:18:54.408Z)
user=k1, scores=[65,240,352,120,40], window=[2019-06-08T21:18:55.407Z..2019-06-08T21:19:09.407Z)
user=k1, scores=[0], window=[2019-06-08T21:18:50.395Z..2019-06-08T21:18:50.396Z)
Изменение stopValue
на .withStopValue(<int>)
работает как положено.События 98, 145 and 350
находятся в другом сеансе, чем остальные.Обратите внимание, что это не совсем то же самое, что и в описании, поскольку stopValue
назначается отдельному окну вместо нового, но его можно отфильтровать вниз по течению, и это дает вам представление о том, как действовать.Я хотел бы вернуться к этому и также искать реализацию Python.
Все файлы здесь .