Kafka Streams обнаруживает пропущенные записи - PullRequest
0 голосов
/ 04 января 2019

Я создаю потоковое приложение через Kafka Streams 2.10 и столкнулся с концептуальной проблемой.

The producer1 sends (Key -> Value): Session1 -> RUNNING

The producer2 sends (Key -> Value): Sessionabc -> RUNNING

The producer1 sends (Key -> Value): Session1 -> DONE

Теперь я хочу обнаружить мертвую сессию. Я пытаюсь использовать SessionWindow, но поскольку Кафка вычисляет запись по записи, я не могу вычислить все сразу.

Вот мой фрагмент:

builder
    .stream("topic", Consumed.with(serdeKeySessionEvent, serdeValueSessionEvent))
    .groupByKey(Grouped.with(serdeKeySessionEvent, serdeValueSessionEvent))
    .windowedBy(SessionWindows.with(SESSION_DURATION))
    .reduce(new SessionReducer())
    .toStream((windowed, value) -> windowed.key())
    .filter((k,v)-> Objects.nonNull(v) && v.getStatus() == Status.RUNNING)
    .peek((a,b)->System.out.println("This Value is missing: \n   "+a.toString()+b.toString()));`

Примечание: редуктор просто гарантирует, что когда мы видим DONE независимо от того, какой другой элемент у нас есть для того же сеанса, это всегда будет сделано. Есть идеи?

1 Ответ

0 голосов
/ 09 января 2019

С помощью Processor API это можно сделать с помощью небольшого количества кода.DSL можно смешивать с Processor API.

Обработка будет выглядеть следующим образом.

  1. Построить хранилище состояний и добавить его с помощью StreamsBuilder::addStateStore
  2. Создать KStream и вызвать *Функция 1009 * с Transformer, которая выполняет всю работу
  3. Результатом преобразования будут сообщения с информацией, если сессия DEAD или DONE
  4. Использование TransformerВы реализуете, как каждое сообщение должно быть обработано.Для каждого сообщения вы должны обновить keyValue Store, где ключом является идентификатор сессии.Вы должны сохранить метку времени последнего сообщения относительно сеанса
  5. Затем в Punctuator (который вызывается периодически) вы проверяете, какой сеанс является TIMEOUT, и передаете информацию, используя ProcessorContext::forward со статусом (DONE, DEAD)

Весь код, как это сделать, вы можете найти здесь

...