Путаница в реализации KStream / Table в Spring Boot - PullRequest
0 голосов
/ 20 июня 2019

Я пытаюсь получить пример работающего "действия" потока kafka в Spring-Boot, и я, кажется, в итоге запутался:)

Я получаю данные JSON по проводам. Я построил схему в avro , которую я использую для сериализации данных:


{
  "UID": "XJ3_112",
  "type": "11X",
  "state": "PLATFORM_INITIALIZED",
  "fuelremaining": 0,
  "latitude": 50.1232,
  "longitude": -119.257,
  "altitude": 0,
  "time": "2018-07-18T00:00:13.9966Z"
}

{
  "platformUID": "BSG_SS_1_4",
  "type": "OB_334_11",
  "state": "ON_STATION",
  "fuelremaining": -1,
  "latitude": 56.1623,
  "longitude": -44.5614,
  "altitude": 519174,
  "time": "2018-07-18T00:01:43.0871Z"
}

Насколько я знаю:

@Component
class KStreamTransformer {

    @Autowired
    private lateinit var objectMapper: ObjectMapper

    @StreamListener(MyKafkaStreams.INPUT)
    @SendTo(MyKafkaStreams.OUTPUT)
    fun process(input: KStream<*, TestEntity>) : KStream<*, TestEntity> {

        return input.flatMapValues{
            value ->
            val out = Arrays.asList(value)

            out
        }.groupBy() ???
    }
}

Я надеюсь создать таблицу KTable, которая выглядит следующим образом:

| platformUID | состояние | Lat | Lon | Alt | | ----------- | ----- | --- | --- | --- |

И вот тут я запутался.

Я предполагаю, что хочу сделать GroupBy в поле PlatformUID, но мне неясно, как на самом деле двигаться вперед.

Может ли кто-нибудь указать мне правильное направление?

Я думаю, что мне нужно взять поток input и превратить его в таблицу KTable с ключом value.getUID() и значением, которое было до

1 Ответ

2 голосов
/ 21 июня 2019

Если platformUID уже является ключом, который использует производитель данных, вы можете использовать

KTable ktable = kstream
                 .groupByKey()
                 .reduce((oldValue, newValue) -> newValue)

Если нет, KeyValueMapper должен быть вставлен в .groupBy(), и он выглядит как

KTable ktable = kstream
                 .groupBy((k, v) -> v.getPlatformUID())
                 .reduce((oldValue, newValue) -> newValue)

Будет создана внутренняя тема, которая перераспределяет исходную тему с новым ключом.

Для Java 7 синтаксис KeyValueMapper следующий:

KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
        public K apply(K key, V1 value) {
            return key;
        }
    };
...