Как преобразовать несколько сообщений в одно сообщение? - PullRequest
0 голосов
/ 13 января 2019

каждый день мы будем получать сообщения ниже в Student-Topic-In

Message 1: {"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1",  "class":"1"}
Message 2: {"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1",  "class":"1"}
Message 3: {"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1",  "class":"2"}
Message 4: {"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1",  "class":"2"}
Message 5: {"StudentID": "5", "StudentName":"eee","fatherName":"eee1",  "class":"2"}

И в конце дня (один раз в день) по каждому уроку мы должны объединить все сообщения и опубликовать их в "Student-Topic-Out" в формате ниже.

Message 1:{"Class":"1"
          {"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1"},
          {"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1"}
       }
Message 2:{"Class":"2" 
          {"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1"},
          {"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1"},
          {"StudentID": "5", "StudentName":"eee","fatherName":"eee1"}
       }

Я попробовал следующее, но не знаю, как создать список учеников без имени класса?

KStream<String, Object> sampleStream = builder.stream("Student-Topic-in");
    sampleStream
            .filter((k, v) -> v != null)
            .mapValues(v -> (Student) v)
            .groupBy((k, v) -> KeyValue.pair(v.getClass_name(), v))
            .windowedBy(TimeWindows.of(5000))
            //I am not sure how to create a student list without Classname
            .aggregate(Student::new, (k, v, list) -> (Student)list.add((Student)v)

Не могли бы вы дать мне знать, как построить выходное сообщение JSON Kafka Streams?

Ответы [ 2 ]

0 голосов
/ 19 января 2019

Вы можете агрегировать сообщения в списке следующим образом:

KStream<String, Object> sampleStream = builder.stream("Student-Topic-in");
   KTable<Windowed<key>,List<Student>> aggregatedTable =  sampleStream
            .filter((k, v) -> v != null)
            .mapValues(v -> (Student) v)
            .groupBy((k, v) -> KeyValue.pair(v.getClass_name(), v))
            .windowedBy(TimeWindows.of(5000))
            //I am not sure how to create a student list without Classname
            .aggregate(ArrayList::new, (k, v, list) -> list.add((Student)v, 
               Materialized.with(keySerde(), arrayListSerde())
 )

Как только вы получите List<Student>, его можно преобразовать в любой желаемый формат с помощью функции .mapValues().

0 голосов
/ 13 января 2019

Вы можете сделать KStream.groupBy(...).windowedBy().aggregate().mapValues, используя атрибут "class" для группировки.

В Aggregator() вы можете собрать List студентов, которых вы превратите в JSON в mapValues()

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...