Вот что я пытаюсь сделать:
Я пытаюсь использовать сообщения в 5-минутном временном окне и подсчитывать количество полученных сообщений.После этого я хочу скопировать последнее сообщение, поступившее в группу, скопировать его, отредактировать его поле «count», чтобы оно соответствовало количеству сообщений в временном интервале, и отправить его в тему.
Hoever,У меня проблемы с оконной частью, мне нужна помощь в этом.Я сделал свой собственный JSON Serdes (MessageSerde), и Class Class представляет сообщение JSON, но я не уверен, что я должен сейчас делать в отношении процесса создания окон.
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, Message> inputStream =
builder
.stream("users", Consumed.with(Serdes.String(), MessageSerde));
inputStream
.filter(new Predicate<String, Message>() {
@Override
public boolean test(String s, Message s2) {
return s2.getPriority().equals("Low");
}
})
KTable<Windowed<String>, Message> fiveMinuteWindowed = inputStream
.groupByKey(Serialized.with(Serdes.String(), MessageSerde))
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))
.count()
.to("test-filter", Produced.with(Serdes.String(), MessageSerde));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
Обновление
После предложения Матиаса я создал этот код, но столкнулся с несколькими ошибками компиляции, в методе .to:
inputStream
.filter(new Predicate<String, Message>() {
@Override
public boolean test(String s, Message s2) {
return s2.getPriority().equals("Low");
}
})
.groupByKey(Serialized.with(Serdes.String(), MessageSerde))
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(5)))
.aggregate(
new Initializer<Message>() { /* initializer */
@Override
public Message apply() {
Message tuple = new Message();
return tuple;
}
},
new Aggregator<String, Message, Message>() { /* adder */
@Override
public Message apply(String aggKey, Message curMsg, Message tuple) {
tuple.setCount(tuple.getCount() + 1);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss.SSS");
try{
Date parsedDate_cur = dateFormat.parse(curMsg.getTimestamp());
Timestamp timestamp_cur = new java.sql.Timestamp(parsedDate_cur.getTime());
Date parsedDate_agg = dateFormat.parse(tuple.getTimestamp());
Timestamp timestamp_agg = new java.sql.Timestamp(parsedDate_agg.getTime());
if(timestamp_cur.after(timestamp_agg)){
tuple.setTimestamp(timestamp_cur.toString());
}
tuple.setId(curMsg.getId());
tuple.setSourceip(curMsg.getSourceip());
tuple.setType(curMsg.getType());
tuple.setPriority(curMsg.getPriority());
tuple.setName(curMsg.getName());
tuple.setMetadata(curMsg.getMetadata());
}
catch(Exception e){
System.out.println("Error parsing dates");
}
return tuple;
}
},
Materialized.with(Serdes.String(), MessageSerde))
.suppress(untilWindowCloses(unbounded()))
.toStream();
.to("test-filter", Produced.with(Serdes.String(), MessageSerde));
К ошибкам относятся:
Compilation failure
[ERROR] /home/x/Documents/project/src/main/java/com/streams/app/App.java:[194,9] no suitable method found for to(java.lang.String,org.apache.kafka.streams.kstream.Produced<java.lang.String,Pojo.Message>)
[ERROR]method org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>) is not applicable
[ERROR](argument mismatch; inference variable K has incompatible equality constraints org.apache.kafka.streams.kstream.Windowed<java.lang.String>,java.lang.String)
[ERROR]method org.apache.kafka.streams.kstream.KStream.to(org.apache.kafka.streams.processor.TopicNameExtractor<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>,org.apache.kafka.streams.kstream.Produced<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>) is not applicable
[ERROR](argument mismatch; java.lang.String cannot be converted to org.apache.kafka.streams.processor.TopicNameExtractor<org.apache.kafka.streams.kstream.Windowed<java.lang.String>,Pojo.Message>)