Масштабирование приложений Kafka Streams - PullRequest
0 голосов
/ 26 февраля 2019

У меня есть потоковое приложение, которое читает данные из темы.Для каждого сообщения по теме оно будет:

  • сопоставить ключ из ID_установки: ID ресурса: ID модели с идентификатором установки: ИД актива: идентификатор модели: тег
  • сгруппировать по ключу
  • окно на 15 минут
  • уменьшить, чтобы определить и сохранить максимальное значение всех сообщений в окне

Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> materialized = Materialized .as("store");

stream
      .map((s, myObject) -> new KeyValue<>(s + "::" + myObject.getT(),
            new ErrorScore(myObject.getTs(), myObject.getE(), myObject.getO())))
            .groupByKey()
            .windowedBy(configuredTimeWindows)
            .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, 
             newValue), materialized);

В конце концов я будув итоге:

1551168000 1234:a:v1  0.1
1551168900 1234:a:v1  0.2
1551169800 1234:a:v1  0.1

Таким образом, есть три окна, в которых есть ключ и максимальное значение всех сообщений, которые попадают в границы окна.

Тогда у меня естьконечная точка отдыха, которая используется для запроса данных в хранилище состояний / журналах изменений с использованием метаданных.Это означает, что я буду использовать

InteractiveQueryService.getHostInfo()

, чтобы увидеть, находится ли определенный ключ на текущем хосте, или что информация должна быть найдена на другом хосте.

Проблема, с которой я сейчас сталкиваюсь, заключается в следующем: каждый раз, когда я вызываю остальную конечную точку, он должен будет определять по ключу, находятся ли запрошенные данные на этом хосте или на удаленном хосте.если данные не найдены на этом хосте, он вызовет конечную точку отдыха для этого единственного ключа.Поскольку я заинтересован в 200 ключах, это заставит меня создать 200 соединений для сбора данных, распределенных по разным экземплярам.К сожалению, это отнимает слишком много времени.

Я попытался найти решение, например, использовать более грубый ключ, например просто instalId: assetId: modelId и тем самым исключить тег из ключа.Это, однако, приводит к невозможности определить максимальное значение для модели.

Есть идеи, как справиться с этим?

...