Я новичок в Kafka и Kstreams. Я пытаюсь выполнить объединение двух потоков и pu sh вывод в третий поток. Я пробовал несколько дней на различных реализациях и теперь застрял на этой ошибке. ошибка и не может продолжаться. Может кто-нибудь, пожалуйста, помогите?
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.Duration;
import java.util.Properties;
@RestController
public class KafkaProcessingController {
private KafkaStreams streamsInnerJoin;
private Properties properties(){
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-stream-inner-join");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
private void streamsInnerJoinStart(StreamsBuilder builder){
if (streamsInnerJoin != null) {
streamsInnerJoin.close();
}
final Topology topology = builder.build();
streamsInnerJoin = new KafkaStreams(topology, properties());
streamsInnerJoin.start();
}
@RequestMapping("/startStreamStreamInnerJoin2/")
public void startStreamStreamInnerJoin2() {
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Item> leftSource = builder.stream("my-kafka-left-stream-topic"
, Consumed.with(Serdes.String(), new SchoolSerde() ) );
KStream<String, Item> rightSource = builder.stream("my-kafka-right-stream-topic"
, Consumed.with(Serdes.String(), new SchoolSerde() ));
KStream<String, Item> joined= leftSource
.selectKey((key, value) -> value.getName() )
.join( rightSource
.selectKey((key, value) -> value.getName())
,(value1, value2) -> {
System.out.println("value2.getName() >> "+value1.getName()+ value2.getName());
return value2;}
,JoinWindows.of(Duration.ofMinutes(5))
,Joined.with(
Serdes.String(),
new SchoolSerde(),
new SchoolSerde()
)
);
joined.to("my-kafka-stream-stream-inner-join-out");
streamsInnerJoinStart(builder);
}
public class SchoolSerde extends Serdes.WrapperSerde<Item> {
public SchoolSerde () {
super(new JsonSerializer<>(), new JsonDeserializer<>(Item.class));
}
}
}
Это то, что у меня есть в темах Кафки, то же самое для обеих тем
CreateTime:1588414271850 1 {"id":1,"name":"nuwan","category":"home"}
Этот пункт. java
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@JsonIgnoreProperties(ignoreUnknown = true)
public class Item {
private int id;
private String name;
private String category;
public int getId() {
return id;
}
@JsonCreator
public Item(@JsonProperty("id") int id, @JsonProperty("name") String name, @JsonProperty("category") String category) {
this.id = id;
this.name = name;
this.category = category;
}
@Override
public String toString() {
return "Item{" +
"id=" + id +
", name='" + name + '\'' +
", category='" + category + '\'' +
'}';
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
}
Окончательная ошибка
Exception in thread "stream-stream-inner-join-0816e64b-5e97-4ca1-bb08-5976d3506e33-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_0, processor=KSTREAM-SOURCE-0000000006, topic=stream-stream-inner-join-KSTREAM-KEY-SELECT-0000000002-repartition, partition=0, offset=5, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: ex4.Item). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.