Kafka Kstream Json Соединение не может быть приведено к java .lang.String - PullRequest
1 голос
/ 02 мая 2020

Я новичок в Kafka и Kstreams. Я пытаюсь выполнить объединение двух потоков и pu sh вывод в третий поток. Я пробовал несколько дней на различных реализациях и теперь застрял на этой ошибке. ошибка и не может продолжаться. Может кто-нибудь, пожалуйста, помогите?

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.Duration;
import java.util.Properties;

@RestController
public class KafkaProcessingController {

    private KafkaStreams streamsInnerJoin;

    private Properties properties(){
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-stream-inner-join");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return  props;
    }

    private void streamsInnerJoinStart(StreamsBuilder builder){
        if (streamsInnerJoin != null) {
            streamsInnerJoin.close();
        }
        final Topology topology = builder.build();
        streamsInnerJoin = new KafkaStreams(topology, properties());
        streamsInnerJoin.start();
    }

    @RequestMapping("/startStreamStreamInnerJoin2/")
    public void startStreamStreamInnerJoin2() {

        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Item> leftSource = builder.stream("my-kafka-left-stream-topic"
                , Consumed.with(Serdes.String(), new SchoolSerde() )        );
        KStream<String, Item> rightSource = builder.stream("my-kafka-right-stream-topic"
                , Consumed.with(Serdes.String(), new SchoolSerde() ));
        KStream<String, Item> joined= leftSource
                .selectKey((key, value) -> value.getName() )
                .join(  rightSource
                            .selectKey((key, value) -> value.getName())
                                ,(value1, value2) -> {
                                    System.out.println("value2.getName() >> "+value1.getName()+ value2.getName());
                                    return value2;}
                        ,JoinWindows.of(Duration.ofMinutes(5))
                        ,Joined.with(
                         Serdes.String(),
                         new SchoolSerde(),
                         new SchoolSerde()
                        )
                );
        joined.to("my-kafka-stream-stream-inner-join-out");
        streamsInnerJoinStart(builder);
    }

    public class SchoolSerde extends Serdes.WrapperSerde<Item> {
        public SchoolSerde () {
            super(new JsonSerializer<>(), new JsonDeserializer<>(Item.class));
        }
    }
}

Это то, что у меня есть в темах Кафки, то же самое для обеих тем

CreateTime:1588414271850    1   {"id":1,"name":"nuwan","category":"home"}

Этот пункт. java


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@JsonIgnoreProperties(ignoreUnknown = true)
public class Item {
    private int id;
    private String name;
    private String category;

    public int getId() {
        return id;
    }


    @JsonCreator
    public Item(@JsonProperty("id") int id, @JsonProperty("name") String name, @JsonProperty("category") String category) {
        this.id = id;
        this.name = name;
        this.category = category;
    }

    @Override
    public String toString() {
        return "Item{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", category='" + category + '\'' +
                '}';
    }


    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getCategory() {
        return category;
    }

    public void setCategory(String category) {
        this.category = category;
    }
}


Окончательная ошибка

Exception in thread "stream-stream-inner-join-0816e64b-5e97-4ca1-bb08-5976d3506e33-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_0, processor=KSTREAM-SOURCE-0000000006, topic=stream-stream-inner-join-KSTREAM-KEY-SELECT-0000000002-repartition, partition=0, offset=5, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: ex4.Item). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

1 Ответ

1 голос
/ 02 мая 2020

Ошибка связана с тем, что для перераспределения, вызванного операцией selectKey, указаны неправильные значения Serdes.

Просто для некоторого контекста, каждый раз, когда вы меняете ключ и выполняете объединение или агрегирование, Kafka Streams автоматически перераспределяет данные, чтобы убедиться, что измененный ключ находится в правильном разделе.

Но из что я вижу, вы предоставляете правильный Serdes для перераспределения (ей) в методе Joined.

Joined.with(Serdes.String(),
            new SchoolSerde(),
            new SchoolSerde())

, но по какой-то причине используется значение по умолчанию Serdes из конфигурации вместо. Вместо сеанса отладки вы можете попробовать это в качестве обходного пути для каждого потока.

leftSource.selectKey((key, value) -> value.getName() )
           .through("left-source-repartition", 
                    Produced.with(Serdes.String(), new SchoolSerde())
           .join(
            rightSource.selectKey((key, value) -> value.getName() )
                        .through("right-source-repartition", 
                                Produced.with(Serdes.String(), new SchoolSerde()),
.....

Этот обходной путь должен возобновить работу. РЕДАКТИРОВАТЬ: я забыл упомянуть, что вам нужно создать темы для операции through, а новые темы должны иметь

  1. Одинаковое количество разделов каждый
  2. В как минимум такое же количество разделов, как и в исходных темах, в идеале то же самое число.

Какую версию Kafka Streams вы используете?

Дайте мне знать, как это происходит.

...