Я реализовал примеры Java и Python для этого варианта использования.Java придерживается подхода, предложенного Даниэлем Оливейрой, но я думаю, что приятно поделиться рабочим образцом.
Версия Java:
Мы можем адаптировать код из Session окон, чтобы соответствовать нашему варианту использования.
Вкратце, когда записи помещаются в окна в сеансах, они назначаются окну, которое начинается с отметки времени элемента (невыровненные окна), и добавляет длительность промежутка к началу вычисления.конец.Функция mergeWindows
затем объединит все перекрывающиеся окна для каждого ключа, что приведет к расширенному сеансу.
Нам потребуется изменить функцию assignWindows
, чтобы создать окно с пропуском, управляемым данными, вместо фиксированногопродолжительность.Мы можем получить доступ к элементу через WindowFn.AssignContext.element()
.Исходная функция присваивания:
public Collection<IntervalWindow> assignWindows(AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
}
Измененная функция будет:
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
// Assign each element into a window from its timestamp until gapDuration in the
// future. Overlapping windows (representing elements within gapDuration of
// each other) will be merged.
Duration dataDrivenGap;
JSONObject message = new JSONObject(c.element().toString());
try {
dataDrivenGap = Duration.standardSeconds(Long.parseLong(message.getString(gapAttribute)));
}
catch(Exception e) {
dataDrivenGap = gapDuration;
}
return Arrays.asList(new IntervalWindow(c.timestamp(), dataDrivenGap));
}
Обратите внимание, что мы добавили пару дополнительных вещей:
- A значение по умолчанию для случаев, когда в данных отсутствует пользовательский разрыв
- Способ установки атрибута из основного конвейера в качестве метода пользовательских окон.
Методы withDefaultGapDuration
и withGapAttribute
:
/** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
public static DynamicSessions withDefaultGapDuration(Duration gapDuration) {
return new DynamicSessions(gapDuration, "");
}
public DynamicSessions withGapAttribute(String gapAttribute) {
return new DynamicSessions(gapDuration, gapAttribute);
}
Мы также добавим новое поле (gapAttribute
) и конструктор:
public class DynamicSessions extends WindowFn<Object, IntervalWindow> {
/** Duration of the gaps between sessions. */
private final Duration gapDuration;
/** Pub/Sub attribute that modifies session gap. */
private final String gapAttribute;
/** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
private DynamicSessions(Duration gapDuration, String gapAttribute) {
this.gapDuration = gapDuration;
this.gapAttribute = gapAttribute;
}
Затем мы можем поместить наши сообщения в новые пользовательские сеансы с помощью:
.apply("Window into sessions", Window.<String>into(DynamicSessions
.withDefaultGapDuration(Duration.standardSeconds(10))
.withGapAttribute("gap"))
Чтобы проверить это, мы будем использовать простой пример с контролируемым вводом.В нашем случае мы рассмотрим различные потребности наших пользователей в зависимости от устройства, на котором запущено приложение.Пользователи настольных компьютеров могут бездействовать долго (с учетом более длительных сеансов), тогда как мы ожидаем, что сеансы с коротким промежутком времени только для наших мобильных пользователей.Мы генерируем некоторые фиктивные данные, в которых некоторые сообщения содержат атрибут gap
, а другие его опускают (для этих окон окно пробела будет использовать значение по умолчанию):
.apply("Create data", Create.timestamped(
TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"12\",\"gap\":\"5\"}", new Instant()),
TimestampedValue.of("{\"user\":\"desktop\",\"score\":\"4\"}", new Instant()),
TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"-3\",\"gap\":\"5\"}", new Instant().plus(2000)),
TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"2\",\"gap\":\"5\"}", new Instant().plus(9000)),
TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"7\",\"gap\":\"5\"}", new Instant().plus(12000)),
TimestampedValue.of("{\"user\":\"desktop\",\"score\":\"10\"}", new Instant().plus(12000)))
.withCoder(StringUtf8Coder.of()))
Визуально:
Для пользователя настольного компьютера только два события разделены 12 секундами.Разрыв не указан, поэтому по умолчанию он равен 10 с, и оба балла не будут суммироваться, поскольку они будут принадлежать к разным сеансам.
У другого мобильного пользователя 4 события разделены на 2, 7 и 3 секунды соответственно.Ни одно из временных разделений не превышает разрыв по умолчанию, поэтому при стандартных сеансах все события будут принадлежать одному сеансу с добавленной оценкой 18:
user=desktop, score=4, window=[2019-05-26T13:28:49.122Z..2019-05-26T13:28:59.122Z)
user=mobile, score=18, window=[2019-05-26T13:28:48.582Z..2019-05-26T13:29:12.774Z)
user=desktop, score=10, window=[2019-05-26T13:29:03.367Z..2019-05-26T13:29:13.367Z)
В новых сеансах мы указываем атрибут «разрыв»5 секунд до тех событий.Третье сообщение приходит через 7 секунд после второго, и теперь оно переходит в другой сеанс.Предыдущая большая сессия с результатом 18 будет разделена на две 9-балльные сессии:
user=desktop, score=4, window=[2019-05-26T14:30:22.969Z..2019-05-26T14:30:32.969Z)
user=mobile, score=9, window=[2019-05-26T14:30:22.429Z..2019-05-26T14:30:30.553Z)
user=mobile, score=9, window=[2019-05-26T14:30:33.276Z..2019-05-26T14:30:41.849Z)
user=desktop, score=10, window=[2019-05-26T14:30:37.357Z..2019-05-26T14:30:47.357Z)
Полный код здесь .Протестировано с Java SDK 2.13.0
Версия Python:
Аналогично, мы можем распространить тот же подход на Python SDK.Код для Sessions
класса можно найти здесь .Мы определим новый класс DynamicSessions
.Внутри метода assign
мы можем получить доступ к обработанной записи, используя context.element
и изменить разрыв в соответствии с данными.
Оригинал:
def assign(self, context):
timestamp = context.timestamp
return [IntervalWindow(timestamp, timestamp + self.gap_size)]
Extended:
def assign(self, context):
timestamp = context.timestamp
try:
gap = Duration.of(context.element[1][“gap”])
except:
gap = self.gap_size
return [IntervalWindow(timestamp, timestamp + gap)]
Если входные данные содержат поле gap
, оно будет использоваться для переопределения размера пропуска по умолчанию.В нашем коде конвейера нам просто нужно поместить события в DynamicSessions
вместо стандартного Sessions
:
'user_session_window' >> beam.WindowInto(DynamicSessions(gap_size=gap_size),
timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW)
При стандартных сеансах вывод будет следующим:
INFO:root:>> User mobile had 4 events with total score 18 in a 0:00:22 session
INFO:root:>> User desktop had 1 events with total score 4 in a 0:00:10 session
INFO:root:>> User desktop had 1 events with total score 10 in a 0:00:10 session
Снаши пользовательские оконные мобильные события разделены на две разные сессии:
INFO:root:>> User mobile had 2 events with total score 9 in a 0:00:08 session
INFO:root:>> User mobile had 2 events with total score 9 in a 0:00:07 session
INFO:root:>> User desktop had 1 events with total score 4 in a 0:00:10 session
INFO:root:>> User desktop had 1 events with total score 10 in a 0:00:10 session
Все файлы здесь .Протестировано с Python SDK 2.13.0