Когда присоединяется к KStream
с GlobalKTable
, вы можете использовать части ключа и значение KStream,
, но в конечном итоге оно должно совпадать со всем ключом GlobalKTable
, поэтому К сожалению, вы не можете сделать то, что вы сказали выше, с помощью объединения.
Но вы все равно должны быть в состоянии сделать что-то близкое к этому, даже используя DSL. Если вы использовали KStream.transformValues
с ValueTransformerWithKeySupplier
, вы можете отсканировать хранилище состояний и извлечь нужные записи на основе подстроки, содержащейся в записи потока. Кроме того, вам не обязательно сканировать весь магазин, а вместо этого использовать запрос диапазона .
РЕДАКТИРОВАТЬ: Вот код, который я получил, чтобы продемонстрировать, к чему я стремлюсь.
@SuppressWarnings("unchecked")
public class MultiResultJoinExample {
public static void main(String[] args) {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "mult-partial-key-join-results");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
final StreamsBuilder builder = new StreamsBuilder();
final String storeName = "kv-store";
final StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName),
Serdes.String(),
Serdes.String());
builder.addStateStore(keyValueStoreBuilder);
final KStream<String, String> streamToJoinAgainst = builder.stream("to-join-input", Consumed.with(Serdes.String(), Serdes.String() ));
streamToJoinAgainst.transformValues(new StoringValueTransformer(storeName), storeName);
final KStream<String, String> streamNeedingJoin = builder.stream("need-join-input", Consumed.with(Serdes.String(), Serdes.String()));
streamNeedingJoin.flatTransformValues(new FlatMapJoiningTransformer(storeName), storeName).to("output", Produced.with(Serdes.String(), Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(props), props);
streams.start();
}
static final class FlatMapJoiningTransformer implements ValueTransformerWithKeySupplier<String, String, Iterable<String>> {
final String storeName;
public FlatMapJoiningTransformer(String storeName) {
this.storeName = storeName;
}
@Override
public ValueTransformerWithKey<String, String, Iterable<String>> get() {
return new ValueTransformerWithKey<String, String, Iterable<String>>() {
private KeyValueStore<String, String> kvStore;
@Override
public void init(ProcessorContext<Void, Void> context) {
kvStore = (KeyValueStore<String, String>) context.getStateStore(storeName);
}
@Override
public Iterable<String> transform(String readOnlyKey, String value) {
List<String> results = new ArrayList<>();
final String patternToMatch = readOnlyKey.substring(4, 7);
try (KeyValueIterator<String, String> iter = kvStore.all()) {
while(iter.hasNext()) {
final KeyValue<String, String> kv = iter.next();
if (kv.key.contains(patternToMatch) || kv.value.contains(patternToMatch)){
results.add(kv.value + " - " + value);
}
}
}
return results;
}
@Override
public void close() {
}
};
}
}
static final class StoringValueTransformer implements ValueTransformerWithKeySupplier<String, String, String> {
final String storeName;
public StoringValueTransformer(String storeName) {
this.storeName = storeName;
}
@Override
public ValueTransformerWithKey<String, String, String> get() {
return new ValueTransformerWithKey<String, String, String>(){
private KeyValueStore<String, String> kvStore;
@Override
public void init(ProcessorContext<Void, Void> context) {
kvStore = (KeyValueStore<String, String>)context.getStateStore(storeName);
}
@Override
public String transform(String readOnlyKey, String value) {
kvStore.putIfAbsent(readOnlyKey, value);
return value;
}
@Override
public void close() {
//no-op
}
};
}
}
}
HTH, Билл