Я пытаюсь получить пример работающего "действия" потока kafka в Spring-Boot, и я, кажется, в итоге запутался:)
Я получаю данные JSON по проводам. Я построил схему в avro , которую я использую для сериализации данных:
{
"UID": "XJ3_112",
"type": "11X",
"state": "PLATFORM_INITIALIZED",
"fuelremaining": 0,
"latitude": 50.1232,
"longitude": -119.257,
"altitude": 0,
"time": "2018-07-18T00:00:13.9966Z"
}
{
"platformUID": "BSG_SS_1_4",
"type": "OB_334_11",
"state": "ON_STATION",
"fuelremaining": -1,
"latitude": 56.1623,
"longitude": -44.5614,
"altitude": 519174,
"time": "2018-07-18T00:01:43.0871Z"
}
Насколько я знаю:
@Component
class KStreamTransformer {
@Autowired
private lateinit var objectMapper: ObjectMapper
@StreamListener(MyKafkaStreams.INPUT)
@SendTo(MyKafkaStreams.OUTPUT)
fun process(input: KStream<*, TestEntity>) : KStream<*, TestEntity> {
return input.flatMapValues{
value ->
val out = Arrays.asList(value)
out
}.groupBy() ???
}
}
Я надеюсь создать таблицу KTable, которая выглядит следующим образом:
| platformUID | состояние | Lat | Lon | Alt |
| ----------- | ----- | --- | --- | --- |
И вот тут я запутался.
Я предполагаю, что хочу сделать GroupBy
в поле PlatformUID
, но мне неясно, как на самом деле двигаться вперед.
Может ли кто-нибудь указать мне правильное направление?
Я думаю, что мне нужно взять поток input
и превратить его в таблицу KTable с ключом value.getUID()
и значением, которое было до