Одновременные операции на KStream & KTables - PullRequest
2 голосов
/ 05 апреля 2020

Я пытаюсь реализовать сценарий использования в потоках Kafka, где я заполняю Ktable на основе применения некоторых фильтров к этому потоку, давайте назовем эту таблицу таблицей отслеживания, в которой ключ получен из события, а значение - это событие , Теперь для последующих событий я проверяю эту таблицу, чтобы убедиться, что они отслежены, и обновляю событие, если оно отслеживается, или публикует sh его в другой топи c. Я не уверен, как сделать это одновременно. Вот что у меня есть.

// Branch based on conditions
KStream<String, Event>[] segregatedRecords = branches[0]
                       .branch((key, event) -> event.getStatus().getStatus().equals("A"),
                        (key, event) -> event.getStatus().getStatus().equals("B"),
                        (key, event) -> event.getStatus().getStatus().equals("C"),


// Store events with status A to a topic
segregatedRecords[0]
                .selectKey((key, event) -> createKey(event))
                .mapValues(transform)
                .to(intermediateTopic);

// Load topic from previous step as GlobalKtable
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic);

// The following step is where I'm stuck, because I can not perform conditional actions
// If the event exists in the tracking table (update) but if not then how to publish it to a different topic?
segregatedRecords[1]
                 // derive key for lookup
                .selectKey((key, event) -> createKey(event))
                // update the event status in the table 
                .join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent
                ).to(intermediateTopic);

// Other events will need to refer the latest information in the tracked table for further processing 


1 Ответ

2 голосов
/ 05 апреля 2020

Вы можете сделать это, разветвив segregatedRecords[1] на 2 под-топологии, одна ветвь выполняет блокировку таблицы как ваш код, а другая ветвь использует API-интерфейс процессора низкого уровня (в данном случае используя transformValues), чтобы проверить, является ли базовый * Хранилище состояний 1002 * содержит запись для нового производного ключа. Если запись существует, преобразуйте Событие в null Событие, затем мы отфильтруем событие с нулевым значением Event (потому что те события, к которым мы уже присоединились в вашей первой подпрограмме -Топология). Я немного обновил ваш код:

//give your GlobalKTable a name to query later
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic, Materialized.as("tracked_event_global_store"));

KStream<String, Event> derivedKStream = segregatedRecords[1]
    // derive key for lookup
    .selectKey((key, event) -> createKey(event));
// this sub-topology perform table lockup as normal: update the event status in the table
derivedKStream.join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent)
    .to(intermediateTopic);
// this sub-topology check whether the event existed in trackedEvents, if yes then event has been already joined 
// so we transform to null value and filter in next step 
derivedKStream.transformValues(() -> new ValueTransformerWithKey<String, Event, Event>() {
    //get the underlying store of Tracked GlobalKTable
    KeyValueStore<String, Event> trackedKvStore;
    @Override
    public void init(ProcessorContext context) {
        //using the previous name
        trackedKvStore = (KeyValueStore<String, Event>) context.getStateStore("tracked_event_global_store");
    }

    @Override
    public Event transform(String derivedKey, Event event) {
        //if event existed in trackedEvents then return a null event so we can filter out in next pipe
        if (trackedKvStore.get(derivedKey) != null) {
            return null;
        }
        //event not exist in trackedEvents, keep the event and send to different topic
        return event;
    }

    @Override
    public void close() {
    }
})
.filter((derivedKey, event) -> event != null)
.to("your different toic name");

Обновление : о проблеме, при которой вы не можете создать как GlobalKTable, так и KStream из одной топи c intermediate (можете не читать topi c несколько раз , как описано здесь ):

  1. Создать выделенный ввод topi c для GlobalKTable (эта топи c должна иметь журнал сжатие включено):
KStream<Object, Object> intermediateKStream = streamsBuilder.stream(intermediate);
intermediateKStream.to(trackedInputTopic);
//instead of building GlobalKTable from intermediate, use this dedicated topic trackedInputTopic
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(trackedInputTopic, Materialized.as("tracked_event_global_store"));

//Perform things you want to do with the intermediate topic
intermediateKStream
        ...
...