Исключением является то, что ваше flatMapValues
созданное значение типа String
. В вашем коде вы не передаете функцию Produced
в KStream::to
, поэтому она пытается использовать функцию по умолчанию (переданную в свойствах), которая в вашем случае равна PersonSeder.class
.
Ваши значения имеют тип String
, но PersonSeder.class
используется для сериализации.
Если вы хотите разделить его, вам нужно что-то вроде этого
KStream<String, Person> output = source
.flatMapValues(person ->
Arrays.stream(person.getName().split(","))
.map(name -> new Person(name, person.getAge()))
.collect(Collectors.toList()));
Я использовал следующий код с вашим сериализатором и десериализатором, который является симметричным (также с использованием Gson), и он работает
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, PersonSerdes.class);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Person> source = builder.stream("input");
KStream<String, Person> output = source
.flatMapValues(person ->
Arrays.stream(person.getName()
.split(","))
.map(name -> new Person(name, person.getAge()))
.collect(Collectors.toList()));
output.to("output");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
ОБНОВЛЕНИЕ 1:
В соответствии с вашим вопросом относительно использования json вместо POJO, все зависит от ваших Sedes. Если вы используете Generic Serdes, вы можете сериализовать и десериализовать в / из Json (Карта)
Ниже приведен простой MapSerdes, который можно использовать для этого, и пример кода использования.
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.Map;
public class MapSerdes implements Serde<Map<String, String>> {
private static final Charset CHARSET = Charset.forName("UTF-8");
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public void close() {}
@Override
public Serializer<Map<String, String>> serializer() {
return new Serializer<Map<String, String>>() {
private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public byte[] serialize(String topic, Map<String, String> data) {
String line = gson.toJson(data); // Return the bytes from the String 'line'
return line.getBytes(CHARSET);
}
@Override
public void close() {}
};
}
@Override
public Deserializer<Map<String, String>> deserializer() {
return new Deserializer<Map<String, String>>() {
private Type type = new TypeToken<Map<String, String>>(){}.getType();
private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public Map<String, String> deserialize(String topic, byte[] data) {
Map<String,String> result = gson.fromJson(new String(data), type);
return result;
}
@Override
public void close() {}
};
}
}
Пример использования:
Вместо имени, зависит от вашей карты, вы можете использовать различные свойства.
public class GenericJsonSplitterApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MapSerdes.class);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Map<String, String>> source = builder.stream("input");
KStream<String, Map<String, String>> output = source
.flatMapValues(map ->
Arrays.stream(map.get("name")
.split(","))
.map(name -> {
HashMap<String, String> splittedMap = new HashMap<>(map);
splittedMap.put("name", name);
return splittedMap;
})
.collect(Collectors.toList()));
output.to("output");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}