Я новичок в потоках kafka и хотел бы прочитать тему и написать ее часть в новой теме, используя API-интерфейсы kafka streams.
Мой ключ - строка, а значение - Avro.
Есть ли документация / пример, который я могу использовать?
Редактировать:
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, GenericRecord> inputStream = builder.stream("Test_CX_TEST_KAFKA_X");
final KStream<String, String> newStream = inputStream.mapValues(value -> value.get("ID").toString());
newStream.to("SUB_TOPIC",Produced.with(Serdes.String(),Serdes.String()));
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
В SUB_TOPIC у меня есть:
Key:
{
"ID": "145"
}
Отметка:
14 марта 2019 г. 17: 52: 23.43
смещение:
12
Раздел:
0
Моя тема ввода:
{
«ID»: «145»,
"TIMESTAMP": 1552585938545,
"НЕДЕЛЯ": "\ u0000",
"ИСТОЧНИК": {
"строка": "ТМП"
},
"ТЕЛО": {
"string": "{\" operation_type \ ": \" INSERT \ ", \" old \ ": {\" ROW_ID \ ": null, \" LAST_UPD \ ": null, \" DENOMINATION \ ": null, \ "SIREN_SIRET \": нулевой}, \ "новый \": {\ "ROW_ID \": \ "170309 - ******** \", \ "LAST_UPD \": \ "2019-03-14T17: 52:18 \ "\ "ОБОЗНАЧЕНИЕ \": \ "1 - ****** \", \ "SIREN_SIRET \": нулевая}}"
},
"TYPE_ACTION": {
"string": "INSERT"
}
}
Как я могу добавить другие поля из тела в новой теме?
пример:
{
«ID»: «145»,
"TIMESTAMP": 1552585938545,
"НЕДЕЛЯ": "\ u0000",
"ИСТОЧНИК": {
"строка": "ТМП"
},
"ТЕЛО": {
"string": "{\" operation_type \ ": \" INSERT \ ", \" old \ ": {\" ROW_ID \ ": null, \" LAST_UPD \ ": null}, \" new \ ": {\ "ROW_ID \": \ "170309 - ******** \", \ "LAST_UPD \": \ "2019-03-14T17: 52: 18 \"}}»
},
"TYPE_ACTION": {
"string": "INSERT"
}
}