Как отобразить мое значение потока в мой класс объекта - PullRequest
0 голосов
/ 20 октября 2019

У меня есть Song.Class (song_id, song_name, song_duration). ....

package org.example.demo.model;

public class Flux {
private int user_id;
private int song_id;
private float listening_duration;

public Flux(int user_id, int song_id, float listening_duration) {
    this.user_id = user_id;
    this.song_id = song_id;
    this.listening_duration = listening_duration;
}

...

У меня есть первая программа для отправки некоторых событий в тему kafka (сериализация в Avro):

props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", KafkaAvroSerializer.class.getName());
props.put("schema.registry.url","http://172.17.0.8:8081");
KafkaProducer<String, Flux> kafkaProducerFlux = new KafkaProducer<String, Flux>(props);

Witin loop:

Flux flux = Flux.newBuilder()
.setSongId(arraysong.get(s).getSongId())
.setUserId(arrayuser.get(u).getUserId())
.setListeningDuration(arraysong.get(s).getSongDuration())
.build();

kafkaProducerFlux.send(new ProducerRecord<String, Flux>(topic_events, flux));

....

Теперь я хотел бы вернуть свой объект в потоке как:

KStream<String, Flux> source = builder.stream("music_flux");
KStream<String, GenericRecord> source = builder.stream("music_flux");

Чтобы сделать ставкувозможность присоединиться к user_id с входами из других потоков.

Спасибо за вашу помощь.

1 Ответ

0 голосов
/ 20 октября 2019

Вам необходимо настроить десериализатор ключа и значения в свойствах потока kafka. Поскольку схема Flux записывается в реестр схем через вашего производителя Kafka, приложение Kstream будет читать схему оттуда. Вам необходимо настроить « SCHEMA_REGISTRY_URL_CONFIG » и « VALUE_SERDE_CLASS_CONFIG », как указано ниже:

KStreamBuilder builder = new KStreamBuilder();

Properties props= new Properties();
// Set a few key parameters
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-app-1");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://172.17.0.8:8081");
StreamsConfig config = new StreamsConfig(props);

KStream<String, Flux> source = builder.stream("music_flux");
...