Apache Kafka Группировка дважды - PullRequest
0 голосов
/ 26 июня 2018

Я пишу приложение, в котором пытаюсь подсчитать количество пользователей, которые посещают страницу каждый час. Я пытаюсь отфильтровать по определенным событиям, сгруппировать по userId и времени часа события, а затем сгруппировать по часам, чтобы узнать количество пользователей. Но группировка KTable вызывает чрезмерное сгорание процессора и блокировку при попытке закрыть потоки. Есть ли лучший способ сделать это?

    events
   .groupBy(...)
   .aggregate(...)
   .groupBy(...);
   .count();

1 Ответ

0 голосов
/ 28 июня 2018

Учитывая ответ на ваш вопрос выше "Я просто хочу узнать в течение часового промежутка времени количество пользователей, которые выполнили определенное действие", я бы предложил следующее.

Предположим, у вас есть что-то вроде этого:

class ActionRecord {
  String actionType;
  String user;
}

Вы можете определить агрегатный класс примерно так:

class ActionRecordAggregate {
  private Set<String> users = new HashSet<>();

  public void add(ActionRecord rec) {
    users.add(rec.getUser());
  }

  public int count() {
    return users.size();
  }

}

Тогда ваше потоковое приложение может:

  • принять события
  • перепишите их в соответствии с типом события (.map())
  • сгруппировать их по типу события (.groupByKey())
  • Окно их по времени (выбрано 1 минута, но YMMV)
  • объединить их в ActionRecordAggregate
  • материализуйте их в StateStore

так это выглядит примерно так:

stream()
.map((key, val) -> KeyValue.pair(val.actionType, val)) 
.groupByKey() 
.windowedBy(TimeWindows.of(60*1000)) 
.aggregate(
  ActionRecordAggregate::new, 
  (key, value, agg) -> agg.add(value),
  Materialized
      .<String, ActionRecordAggregate, WindowStore<Bytes, byte[]>>as("actionTypeLookup")
      .withValueSerde(getSerdeForActionRecordAggregate())
);

Затем, чтобы получить события обратно, вы можете запросить хранилище состояний:

ReadOnlyWindowStore<String, ActionRecordAggregate> store = 
  streams.store("actionTypeLookup", QueryableStoreTypes.windowStore());

WindowStoreIterator<ActionRecordAggregate> wIt = 
  store.fetch("actionTypeToGet", startTimestamp, endTimestamp);

int totalCount = 0;
while(wIt.hasNext()) {
  totalCount += wIt.next().count();
}

// totalCount is the number of distinct users in your 
// time interval that raised action type "actionTypeToGet"

Надеюсь, это поможет!

...