Я хочу перезаписать GlobalKTable (возможно, при его инициализации, поскольку я считаю, что они читаются только после создания).
Возможно ли это?
У меня есть две темы, с которыми я работаю в приложении Spring / Java Kafka Streams. Первое не уплотняется, второе есть. Оба используют Avro для своих ключей и значений.
Приложение выполняет потоковую передачу записей из первой (не уплотненной) темы и присоединяет дополнительные данные из уплотненной темы через KStream#leftJoin
. Уплотненная тема была перенесена в приложение в виде GlobalKTable, созданной с помощью StreamsBuilder#globalTable()
и должна оставаться такой (мне нужны все записи из всех разделов темы, доступных в каждом экземпляре приложения).
Я знаю, что говорят о поддержке соединений не первичного ключа (https://issues.apache.org/jira/browse/KAFKA-3705),, но, насколько мне известно, я пока не могу этого сделать ...
@Configuration
@EnableKafkaStreams
public class StreamsConfig {
@Autowired
private MyCustomSerdes serdes;
@Bean
public KStream<AvroKeyOne, AvroValueOne> reKeyJoin(StreamsBuilder streamsBuilder) {
GlobalKTable<AvroKeyOne, AvroValueOne> globalTable = streamsBuilder.globalTable("topicOne", Consumed.with(
serdes.getAvroKeyOne()
serdes.getAvroValueOne()
));
KStream<AvroKeyTwo, AvroValueOne> kStream = streamsBuilder.stream("topicTwo", Consumed.with(
serdes.getAvroKeyTwo(),
serdes.getAvroValueOne()
));
kStream.join(
globalTable,
/**
* the KeyValueMapper. I need to rekey the Global table as well to the
* corresponding String (which it's data will have) if I want this join
* to return results
*/
(streamKey, streamValue) -> {return streamKey.getNewStringKey()},
(/**ValueJoiner Deal**/)
);
}
}