Я должен потреблять json данных, поступающих в поток kafka и отправлять в разные темы (отличное сочетание идентификатора приложения и сущности) для дальнейшего использования.
topi c names:
app1.entity1
app1.entity2
app2.entity1
app2.entity2
Json Данные
[
{
"appId": "app1",
"entity": "entity1",
"extractType": "txn",
"status": "success",
"fileId": "21151235"
},
{
"appId": "app1",
"entity": "entity2",
"extractType": "txn",
"status": "fail",
"fileId": "2134234123"
},
{
"appId": "app2",
"entity": "entity3",
"extractType": "payment",
"status": "success",
"fileId": "2312de23e"
},
{
"appId": "app2",
"entity": "entity3",
"extractType": "txn",
"status": "fail",
"fileId": "asxs3434"
}
]
TestInput. java
private String appId;
private String entity ;
private String extractType;
private String status;
private String fileId;
setter/gtter
SpringBootConfig. java
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs(KafkaProperties kafkaProperties) {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
config.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getClientId());
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new JsonSerde<>(TestInput.class).getClass());
config.put(JsonDeserializer.DEFAULT_KEY_TYPE, String.class);
config.put(JsonDeserializer.DEFAULT_VALUE_TYPE, TestInput.class);
return new KafkaStreamsConfiguration(config);
}
@Bean
public KStream<String, TestInput> kStream(StreamsBuilder kStreamBuilder) {
KStream<String, TestInput> stream = kStreamBuilder.stream(inputTopic);
// how to form key , group records and send to different topics
return stream;
}
Я много искал, но ничего не нашел рядом с которой публикует данные по темам динамически. Пожалуйста, помогите экспертам