Может ли соединение KStream-GlobalKTable вернуть несколько совпадающих записей для определенного поиска c? - PullRequest
1 голос
/ 23 апреля 2020

Я надеюсь, что кто-то может помочь в решении проблемы, связанной с GlobalKTables в Kafka.

Я пытаюсь выполнить соединение KStream-GlobalKTable. Тем не менее, я хочу получить все записи в GlobalKTable, чье значение или содержит строку, найденную в моем исходном событии потоковой передачи. Например, предположим, что в моей таблице 3 строки со следующими ключами:

Key: BANK055DEPOSIT value: {some data}
 Key: BANK055CREDIT value: {different data}
 Key: BANK033CREDIT value: {more different data}

Когда я выполняю объединение в таблице для извлечения данных, мне нужно извлечь все строки, ключ которых или значение содержит «055». Так что я бы хотел первые 2 ряда.

В мире баз данных это было бы эквивалентно следующему:

SELECT * FROM GlobalKTable where table_key.contains("055”) OR table_value.contains(“055”)

Я изучил официальные документы и не нашел примеров того, как это сделать. У меня есть подозрение, что получение N числа строк из соединения GlobalKTable недостижимо.

Кроме того, для этого я использую потоки DSL. Не уверен, что это будет достигнуто с помощью Processor API. Любой вклад приветствуется!

1 Ответ

1 голос
/ 24 апреля 2020

Когда присоединяется к KStream с GlobalKTable, вы можете использовать части ключа и значение KStream,, но в конечном итоге оно должно совпадать со всем ключом GlobalKTable, поэтому К сожалению, вы не можете сделать то, что вы сказали выше, с помощью объединения.

Но вы все равно должны быть в состоянии сделать что-то близкое к этому, даже используя DSL. Если вы использовали KStream.transformValues с ValueTransformerWithKeySupplier, вы можете отсканировать хранилище состояний и извлечь нужные записи на основе подстроки, содержащейся в записи потока. Кроме того, вам не обязательно сканировать весь магазин, а вместо этого использовать запрос диапазона .

РЕДАКТИРОВАТЬ: Вот код, который я получил, чтобы продемонстрировать, к чему я стремлюсь.

@SuppressWarnings("unchecked")
public class MultiResultJoinExample {

    public static void main(String[] args) {
        final Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mult-partial-key-join-results");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        final StreamsBuilder builder = new StreamsBuilder();

        final String storeName = "kv-store";
        final StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
                Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
                        Serdes.String(),
                        Serdes.String());
        builder.addStateStore(keyValueStoreBuilder);

        final KStream<String, String> streamToJoinAgainst = builder.stream("to-join-input", Consumed.with(Serdes.String(), Serdes.String() ));

        streamToJoinAgainst.transformValues(new StoringValueTransformer(storeName), storeName);

        final KStream<String, String> streamNeedingJoin = builder.stream("need-join-input", Consumed.with(Serdes.String(), Serdes.String()));

        streamNeedingJoin.flatTransformValues(new FlatMapJoiningTransformer(storeName), storeName).to("output", Produced.with(Serdes.String(), Serdes.String()));

        final KafkaStreams streams = new KafkaStreams(builder.build(props), props);
        streams.start();
    }

    static final class FlatMapJoiningTransformer implements ValueTransformerWithKeySupplier<String, String, Iterable<String>> {
        final String storeName;

        public FlatMapJoiningTransformer(String storeName) {
            this.storeName = storeName;
        }

        @Override
        public ValueTransformerWithKey<String, String, Iterable<String>> get() {
            return new ValueTransformerWithKey<String, String, Iterable<String>>() {
               private KeyValueStore<String, String> kvStore;
                @Override
                public void init(ProcessorContext<Void, Void> context) {
                    kvStore = (KeyValueStore<String, String>) context.getStateStore(storeName);
                }

                @Override
                public Iterable<String> transform(String readOnlyKey, String value) {
                      List<String> results = new ArrayList<>();
                      final String patternToMatch = readOnlyKey.substring(4, 7);
                      try (KeyValueIterator<String, String> iter =  kvStore.all()) {
                           while(iter.hasNext()) {
                               final KeyValue<String, String> kv = iter.next();
                                if (kv.key.contains(patternToMatch) || kv.value.contains(patternToMatch)){
                                    results.add(kv.value + " - " + value);
                                }
                           }
                      }
                      return results;
                }

                @Override
                public void close() {

                }
            };
        }
    }

    static final class StoringValueTransformer implements ValueTransformerWithKeySupplier<String, String, String> {
        final String storeName;

        public StoringValueTransformer(String storeName) {
            this.storeName = storeName;
        }

        @Override
        public ValueTransformerWithKey<String, String, String> get() {
            return new ValueTransformerWithKey<String, String, String>(){
                private KeyValueStore<String, String> kvStore;
                @Override
                public void init(ProcessorContext<Void, Void> context) {
                       kvStore = (KeyValueStore<String, String>)context.getStateStore(storeName);
                }

                @Override
                public String transform(String readOnlyKey, String value) {
                    kvStore.putIfAbsent(readOnlyKey, value);
                    return value;
                }

                @Override
                public void close() {
                     //no-op
                }
            };
        }
    }
}

HTH, Билл

...