Я изучаю Kafka Streams и пытаюсь достичь следующего:
Создано 2 темы Kafka (скажем topic1, topic2) с нулевым ключом и JSONString в качестве значения.Данные из theme1 (без дубликатов) содержат несколько совпадающих записей в topic2.Т.е. topic1 имеет некоторые основные данные для генерации нового множественного потока данных при объединении с topic2.
Пример:
topic1={"name": "abc", "age":2}, {"name": "xyz", "age":3} and so on.
topic2={"name": "abc", "address"="xxxxxx"}, {"name": "abc", "address"="yyyyyy"}, {"name": "xyz", "address"="jjjjjj"}, {"name": "xyz", "address"="xxxkkkkk"}
Ожидаемый результат: {"name": "abc", "age":2, "address"="xxxxxx"}, {"name": "abc", "age":2, "address"="yyyyyy"}, {"name": "xyz", "age":3, "address"="jjjjjj"}, {"name": "xyz", "age":3, "address"="xxxkkkkk"}
Хотелось бысохранить / сохранить поток данных из topic1 для будущих ссылок, в то время как поток данных из topic2 просто используется для достижения вышеупомянутого варианта использования и не требует никакого сохранения / удержания.
У меня есть несколько вопросов: 1) Следует ли хранить / хранить поток данных topic1 в течение нескольких дней (возможно?), Чтобы можно было объединить входящий поток данных из topic2.Является ли это возможным?2) Что я должен использовать для достижения этого, KStream или KTable?3) Это называется механизмом противодавления?
Поддерживает ли Kafka Stream этот сценарий использования или мне нужно искать что-то еще?Пожалуйста, предложите.
Я пробовал часть кода с KStream с окном 5 минут, но похоже, что я не могу держать данные topic1 в потоке.
Пожалуйста, помогите мне с правильным выбором и присоединяйтесь.Я использую Kafka из экземпляра Confluent with Docker.
public void run() {
final StreamsBuilder builder = new StreamsBuilder();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
// Hold data from this topic to 30 days
KStream<String, JsonNode> cs = builder.stream("topic1", consumed);
cs.foreach((k,v) -> {
System.out.println( k + " --->" + v);
});
// Data is involved in one time process.
KStream<String, JsonNode> css = builder.stream("topic2", consumed);
css.foreach((k,v) -> {
System.out.println( k + " --->" + v);
});
KStream<String, JsonNode> resultStream = cs.leftJoin(css,
valueJoiner,
JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
Joined.with(
Serdes.String(), /* key */
jsonSerde, /* left value */
jsonSerde) /* right value */
);
resultStream.foreach((k, v) -> {
System.out.println("JOIN STREAM: KEY="+k+ ", VALUE=" + v);
});
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
}