Создание разных сессий на основе уникального ключа - PullRequest
1 голос
/ 09 апреля 2019

Я получаю сообщения из темы kafka, которая отправляет мне сообщение JSON. Я хотел бы извлечь поле из этого сообщения json (которое может быть, например, идентификатором) и Я хотел бы создать 'n' сеансов для 'n' уникальных идентификаторов устройства .

Я попытался создать новый экземпляр сеанса для каждого уникального идентификатора, который я получаю, но после создания нового экземпляра окна сеанса, т. Е. Создания новой ветви в конвейере для каждого идентификатора, я не могу отправить следующие будущие сообщения в соответствующая ветвь, которая уже существует.

Ожидаемый результат, который я хочу получить, предположим, что мы получаем сообщения типа

{ID: 1, ...}, {ID: 2, ...}, {ID: 3, ...}, {ID: 1, ...}

Будет создано три разных сеанса, и четвертое сообщение будет отправлено в сеанс с идентификатором устройства 1. Есть ли способ сделать это в парадигме программирования Apache Beam или в парадигме программирования Java? Любая помощь будет принята с благодарностью.

Ответы [ 2 ]

2 голосов
/ 16 апреля 2019

Да, это возможно с помощью парадигмы Beam, если вы используете пользовательский WindowFn. Вы можете создать подкласс класса Sessions и изменить его, чтобы установить длительность промежутка по-разному в зависимости от идентификатора каждого элемента. Вы можете сделать это в assignWindows, который выглядит так в Sessions:

  @Override
  public Collection<IntervalWindow> assignWindows(AssignContext c) {
    // Assign each element into a window from its timestamp until gapDuration in the
    // future.  Overlapping windows (representing elements within gapDuration of
    // each other) will be merged.
    return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
  }

Класс AssignContext может использоваться для доступа к элементу, которому назначено это окно, что позволит вам получить идентификатор этого элемента.

Также звучит так, как будто вы хотите, чтобы элементы с разными идентификаторами были сгруппированы в разных окнах (т. Е. Если элементы A и B входят в промежуток времени, но с разными идентификаторами, они все равно должны находиться в разных окнах). Это можно сделать, выполнив GroupByKey с идентификатором ваших элементов в качестве ключей. Окна сеансов применяются для каждого ключа , как описано в Руководстве по программированию луча , поэтому элементы будут разделены по идентификаторам.

0 голосов
/ 14 июля 2019

Я реализовал примеры Java и Python для этого варианта использования.Java придерживается подхода, предложенного Даниэлем Оливейрой, но я думаю, что приятно поделиться рабочим образцом.


Версия Java:

Мы можем адаптировать код из Session окон, чтобы соответствовать нашему варианту использования.

Вкратце, когда записи помещаются в окна в сеансах, они назначаются окну, которое начинается с отметки времени элемента (невыровненные окна), и добавляет длительность промежутка к началу вычисления.конец.Функция mergeWindows затем объединит все перекрывающиеся окна для каждого ключа, что приведет к расширенному сеансу.

Нам потребуется изменить функцию assignWindows, чтобы создать окно с пропуском, управляемым данными, вместо фиксированногопродолжительность.Мы можем получить доступ к элементу через WindowFn.AssignContext.element().Исходная функция присваивания:

public Collection<IntervalWindow> assignWindows(AssignContext c) {
  // Assign each element into a window from its timestamp until gapDuration in the
  // future.  Overlapping windows (representing elements within gapDuration of
  // each other) will be merged.
  return Arrays.asList(new IntervalWindow(c.timestamp(), gapDuration));
}

Измененная функция будет:

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) {
  // Assign each element into a window from its timestamp until gapDuration in the
  // future.  Overlapping windows (representing elements within gapDuration of
  // each other) will be merged.
  Duration dataDrivenGap;
  JSONObject message = new JSONObject(c.element().toString());

  try {
    dataDrivenGap = Duration.standardSeconds(Long.parseLong(message.getString(gapAttribute)));
  }
  catch(Exception e) {
    dataDrivenGap = gapDuration;
  }
  return Arrays.asList(new IntervalWindow(c.timestamp(), dataDrivenGap));
}

Обратите внимание, что мы добавили пару дополнительных вещей:

  • A значение по умолчанию для случаев, когда в данных отсутствует пользовательский разрыв
  • Способ установки атрибута из основного конвейера в качестве метода пользовательских окон.

Методы withDefaultGapDuration и withGapAttribute:

/** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
public static DynamicSessions withDefaultGapDuration(Duration gapDuration) {
  return new DynamicSessions(gapDuration, "");
}

public DynamicSessions withGapAttribute(String gapAttribute) {
  return new DynamicSessions(gapDuration, gapAttribute);
}

Мы также добавим новое поле (gapAttribute) и конструктор:

public class DynamicSessions extends WindowFn<Object, IntervalWindow> {
  /** Duration of the gaps between sessions. */
  private final Duration gapDuration;

    /** Pub/Sub attribute that modifies session gap. */
  private final String gapAttribute;

  /** Creates a {@code DynamicSessions} {@link WindowFn} with the specified gap duration. */
  private DynamicSessions(Duration gapDuration, String gapAttribute) {
    this.gapDuration = gapDuration;
    this.gapAttribute = gapAttribute;
  }

Затем мы можем поместить наши сообщения в новые пользовательские сеансы с помощью:

.apply("Window into sessions", Window.<String>into(DynamicSessions
  .withDefaultGapDuration(Duration.standardSeconds(10))
  .withGapAttribute("gap"))

Чтобы проверить это, мы будем использовать простой пример с контролируемым вводом.В нашем случае мы рассмотрим различные потребности наших пользователей в зависимости от устройства, на котором запущено приложение.Пользователи настольных компьютеров могут бездействовать долго (с учетом более длительных сеансов), тогда как мы ожидаем, что сеансы с коротким промежутком времени только для наших мобильных пользователей.Мы генерируем некоторые фиктивные данные, в которых некоторые сообщения содержат атрибут gap, а другие его опускают (для этих окон окно пробела будет использовать значение по умолчанию):

.apply("Create data", Create.timestamped(
            TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"12\",\"gap\":\"5\"}", new Instant()),
            TimestampedValue.of("{\"user\":\"desktop\",\"score\":\"4\"}", new Instant()),
            TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"-3\",\"gap\":\"5\"}", new Instant().plus(2000)),
            TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"2\",\"gap\":\"5\"}", new Instant().plus(9000)),
            TimestampedValue.of("{\"user\":\"mobile\",\"score\":\"7\",\"gap\":\"5\"}", new Instant().plus(12000)),
            TimestampedValue.of("{\"user\":\"desktop\",\"score\":\"10\"}", new Instant().plus(12000)))
        .withCoder(StringUtf8Coder.of()))

Визуально:

enter image description here

Для пользователя настольного компьютера только два события разделены 12 секундами.Разрыв не указан, поэтому по умолчанию он равен 10 с, и оба балла не будут суммироваться, поскольку они будут принадлежать к разным сеансам.

У другого мобильного пользователя 4 события разделены на 2, 7 и 3 секунды соответственно.Ни одно из временных разделений не превышает разрыв по умолчанию, поэтому при стандартных сеансах все события будут принадлежать одному сеансу с добавленной оценкой 18:

user=desktop, score=4, window=[2019-05-26T13:28:49.122Z..2019-05-26T13:28:59.122Z)
user=mobile, score=18, window=[2019-05-26T13:28:48.582Z..2019-05-26T13:29:12.774Z)
user=desktop, score=10, window=[2019-05-26T13:29:03.367Z..2019-05-26T13:29:13.367Z)

В новых сеансах мы указываем атрибут «разрыв»5 секунд до тех событий.Третье сообщение приходит через 7 секунд после второго, и теперь оно переходит в другой сеанс.Предыдущая большая сессия с результатом 18 будет разделена на две 9-балльные сессии:

user=desktop, score=4, window=[2019-05-26T14:30:22.969Z..2019-05-26T14:30:32.969Z)
user=mobile, score=9, window=[2019-05-26T14:30:22.429Z..2019-05-26T14:30:30.553Z)
user=mobile, score=9, window=[2019-05-26T14:30:33.276Z..2019-05-26T14:30:41.849Z)
user=desktop, score=10, window=[2019-05-26T14:30:37.357Z..2019-05-26T14:30:47.357Z)

Полный код здесь .Протестировано с Java SDK 2.13.0


Версия Python:

Аналогично, мы можем распространить тот же подход на Python SDK.Код для Sessions класса можно найти здесь .Мы определим новый класс DynamicSessions.Внутри метода assign мы можем получить доступ к обработанной записи, используя context.element и изменить разрыв в соответствии с данными.

Оригинал:

def assign(self, context):
  timestamp = context.timestamp
  return [IntervalWindow(timestamp, timestamp + self.gap_size)]

Extended:

def assign(self, context):
  timestamp = context.timestamp

  try:
    gap = Duration.of(context.element[1][“gap”])
  except:
    gap = self.gap_size

  return [IntervalWindow(timestamp, timestamp + gap)]

Если входные данные содержат поле gap, оно будет использоваться для переопределения размера пропуска по умолчанию.В нашем коде конвейера нам просто нужно поместить события в DynamicSessions вместо стандартного Sessions:

'user_session_window'   >> beam.WindowInto(DynamicSessions(gap_size=gap_size),
                                             timestamp_combiner=window.TimestampCombiner.OUTPUT_AT_EOW)

При стандартных сеансах вывод будет следующим:

INFO:root:>> User mobile had 4 events with total score 18 in a 0:00:22 session
INFO:root:>> User desktop had 1 events with total score 4 in a 0:00:10 session
INFO:root:>> User desktop had 1 events with total score 10 in a 0:00:10 session

Снаши пользовательские оконные мобильные события разделены на две разные сессии:

INFO:root:>> User mobile had 2 events with total score 9 in a 0:00:08 session
INFO:root:>> User mobile had 2 events with total score 9 in a 0:00:07 session
INFO:root:>> User desktop had 1 events with total score 4 in a 0:00:10 session
INFO:root:>> User desktop had 1 events with total score 10 in a 0:00:10 session

Все файлы здесь .Протестировано с Python SDK 2.13.0

...