M.Djx,
Я не думаю, что сейчас есть идеальное решение для этого варианта использования в Kafka Streams, но у меня есть несколько мыслей, чтобы приблизить вас.Я готовлюсь представить KIP для точного рассмотрения вариантов использования, подобных этому, в ближайшем будущем.
Один момент: в отличие от KTable, KStreams не являются журналами изменений, поэтому более новые события не перезаписывают более старые события с тем жеключ;они просто сосуществуют в одном потоке.Я думаю, именно поэтому ваш foreach
делает так, что все предупреждения не имеют никакого вмешательства;вы видите промежуточные события соединения до вмешательств.
Например:
LEFT RIGHT JOIN
a:1 a:(1,null)
a:X a:(1,X)
foreach
будет вызвано для обоих результатов соединения, что будет выглядеть как правильное значениеотсутствует, когда на самом деле немного поздно.
Если вы примените временное окно к потоку результатов, вы получите журнал изменений - более новые значения перезапишут более старые.Что-то вроде:
joinedAI
.groupByKey()
.windowedBy(
TimeWindows
.of(1000 * 60 * 60 * 24) // the window will be 24 hours in size
.until(1000 * 60 * 60 * 48) // and we'll keep it in the state store for at least 48 hours
).reduce(
new Reducer<JsonNode>() {
@Override
public Long apply(final JsonNode value1, final JsonNode value2) {
return value2;
}
},
Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("alerts-without-interventions")
);
Облом - то, что это создаст поток журнала изменений с правильной семантикой, но вы все равно увидите промежуточные значения, поэтому вы не захотите запускать какие-либо действия прямо из этогоПоток либо (как foreach
).
Одна вещь, которую вы могли бы сделать, это запланировать работу, один раз в день, для сканирования "alerts-without-interventions"
для окон с вчера .Любой результат, который вы получите из хранилища окон, будет самым последним значением этого ключа.
KIP, который я готовлю, предложит вам способ отфильтровать промежуточные результаты из окна, что позволит вамприкрепить foreach к журналу изменений и запускать его только в конечном результате окна.
В качестве альтернативы, если данные для вашего приложения не слишком велики, и если вы не слишком беспокоитесь о крайних случаях, вы могли бы рассмотреть возможность реализации семантики «оконных событий» самостоятельно с помощью LinkedHashMap или кэша Guava.
Надеюсь, это поможет.