Вы можете сделать это, разветвив segregatedRecords[1]
на 2 под-топологии, одна ветвь выполняет блокировку таблицы как ваш код, а другая ветвь использует API-интерфейс процессора низкого уровня (в данном случае используя transformValues), чтобы проверить, является ли базовый * Хранилище состояний 1002 * содержит запись для нового производного ключа. Если запись существует, преобразуйте Событие в null
Событие, затем мы отфильтруем событие с нулевым значением Event
(потому что те события, к которым мы уже присоединились в вашей первой подпрограмме -Топология). Я немного обновил ваш код:
//give your GlobalKTable a name to query later
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic, Materialized.as("tracked_event_global_store"));
KStream<String, Event> derivedKStream = segregatedRecords[1]
// derive key for lookup
.selectKey((key, event) -> createKey(event));
// this sub-topology perform table lockup as normal: update the event status in the table
derivedKStream.join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent)
.to(intermediateTopic);
// this sub-topology check whether the event existed in trackedEvents, if yes then event has been already joined
// so we transform to null value and filter in next step
derivedKStream.transformValues(() -> new ValueTransformerWithKey<String, Event, Event>() {
//get the underlying store of Tracked GlobalKTable
KeyValueStore<String, Event> trackedKvStore;
@Override
public void init(ProcessorContext context) {
//using the previous name
trackedKvStore = (KeyValueStore<String, Event>) context.getStateStore("tracked_event_global_store");
}
@Override
public Event transform(String derivedKey, Event event) {
//if event existed in trackedEvents then return a null event so we can filter out in next pipe
if (trackedKvStore.get(derivedKey) != null) {
return null;
}
//event not exist in trackedEvents, keep the event and send to different topic
return event;
}
@Override
public void close() {
}
})
.filter((derivedKey, event) -> event != null)
.to("your different toic name");
Обновление : о проблеме, при которой вы не можете создать как GlobalKTable, так и KStream из одной топи c intermediate
(можете не читать topi c несколько раз , как описано здесь ):
- Создать выделенный ввод topi c для
GlobalKTable
(эта топи c должна иметь журнал сжатие включено):
KStream<Object, Object> intermediateKStream = streamsBuilder.stream(intermediate);
intermediateKStream.to(trackedInputTopic);
//instead of building GlobalKTable from intermediate, use this dedicated topic trackedInputTopic
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(trackedInputTopic, Materialized.as("tracked_event_global_store"));
//Perform things you want to do with the intermediate topic
intermediateKStream
...