У меня есть Song.Class
(song_id, song_name, song_duration
). ....
package org.example.demo.model;
public class Flux {
private int user_id;
private int song_id;
private float listening_duration;
public Flux(int user_id, int song_id, float listening_duration) {
this.user_id = user_id;
this.song_id = song_id;
this.listening_duration = listening_duration;
}
...
У меня есть первая программа для отправки некоторых событий в тему kafka (сериализация в Avro):
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url","http://172.17.0.8:8081");
KafkaProducer<String, Flux> kafkaProducerFlux = new KafkaProducer<String, Flux>(props);
Witin loop:
Flux flux = Flux.newBuilder()
.setSongId(arraysong.get(s).getSongId())
.setUserId(arrayuser.get(u).getUserId())
.setListeningDuration(arraysong.get(s).getSongDuration())
.build();
kafkaProducerFlux.send(new ProducerRecord<String, Flux>(topic_events, flux));
....
Теперь я хотел бы вернуть свой объект в потоке как:
KStream<String, Flux> source = builder.stream("music_flux");
KStream<String, GenericRecord> source = builder.stream("music_flux");
Чтобы сделать ставкувозможность присоединиться к user_id
с входами из других потоков.
Спасибо за вашу помощь.