каждый день мы будем получать сообщения ниже в Student-Topic-In
Message 1: {"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1", "class":"1"}
Message 2: {"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1", "class":"1"}
Message 3: {"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1", "class":"2"}
Message 4: {"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1", "class":"2"}
Message 5: {"StudentID": "5", "StudentName":"eee","fatherName":"eee1", "class":"2"}
И в конце дня (один раз в день) по каждому уроку мы должны объединить все сообщения и опубликовать их в "Student-Topic-Out" в формате ниже.
Message 1:{"Class":"1"
{"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1"},
{"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1"}
}
Message 2:{"Class":"2"
{"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1"},
{"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1"},
{"StudentID": "5", "StudentName":"eee","fatherName":"eee1"}
}
Я попробовал следующее, но не знаю, как создать список учеников без имени класса?
KStream<String, Object> sampleStream = builder.stream("Student-Topic-in");
sampleStream
.filter((k, v) -> v != null)
.mapValues(v -> (Student) v)
.groupBy((k, v) -> KeyValue.pair(v.getClass_name(), v))
.windowedBy(TimeWindows.of(5000))
//I am not sure how to create a student list without Classname
.aggregate(Student::new, (k, v, list) -> (Student)list.add((Student)v)
Не могли бы вы дать мне знать, как построить выходное сообщение JSON Kafka Streams?