У меня есть входящие данные для автомобилей и я хочу создать новое событие для поездок. У меня есть следующее сообщение json из темы транспортного средства:
{ asset_id: ‘123’, location: [11.00, 12.00], journey_start: true }
{ asset_id: ‘123’, location: [12.00, 12.00] }
{ asset_id: ‘123’, location: [13.00, 12.00], journey_end: true }
От начала события путешествия до конца события путешествия он должен собирать местоположения, и если путешествие закончилось, он должен отправлять его в путешествие. тема мероприятия. В предыдущем примере сообщение должно выглядеть так:
{ asset_id: ‘123’, locations: [[11.00, 12.00], [12.00, 12.00], [13.00, 12.00]] }
Как только наступит новое событие начала поездки, оно должно отбросить предыдущее событие поездки из локального магазина.
Это то, что я имею до сих пор,Я не знаю, как выполнить агрегацию, добавить условную отправку в тему события поездки и удалить старые данные о местоположении:
public class Stream {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, JsonNode> vehicleEvent = builder.stream(
"vehicle-event",
Consumed.with(
Serdes.String(),
jsonSerde
)
);
KStream<String, JsonNode> journeyEvent = vehicleEvent
.groupBy((key, value) -> {
JSONObject jsonObj = new JSONObject(value);
return jsonObj.getString("asset_id");
})
.aggregate()
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
}
}