Внутреннее соединение KStream-KStream на основе сообщений с совпадающим составным ключом - PullRequest
4 голосов
/ 07 июня 2019

Я пытаюсь выполнить внутреннее соединение между KStream-KStream.Я заметил, что объединение не работает, когда сообщения от обоих KStreams имеют составные ключи (например, Java-Pojo со многими атрибутами), даже если Pojo, используемый в качестве составного ключа, имеет методы hashCode () и equals (Object o) оба реализованы.

UniqueIdKey.java

public class UniqueIdKey {

    private int id;

    public UniqueIdKey() {
    }

    public UniqueIdKey(int id) {
        this.id = id;
    }

    @JsonGetter("id")
    public int getId() {
        return id;
    }

    @JsonSetter("id")
    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "UniqueIdKey{" +
                "id=" + id +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        UniqueIdKey that = (UniqueIdKey) o;
        return id == that.id;
    }

    @Override
    public int hashCode() {
        return Objects.hash(id);
    }
}

Внутреннее объединение работает нормально, когда оба KStreams имеют сообщения с простыми примитивными ключами (например, String, int, double)

Я использую последний весенний облачный поток (Greenwich.SR1) с kafka-client и kafka-stream версии 2.2.1

MainApplication.java

@SpringBootApplication
public class KafkaStreamsTableJoin {

    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsTableJoin.class, args);
    }

    @EnableBinding(KStreamProcessorX.class)
    public static class KStreamToTableJoinApplication {

        @StreamListener
        public void process(@Input("person") KStream<PersonKey, Person> persons,
                                             @Input("school") KStream<SchoolKey, School> schools) {

            //Messages with composite-keys e.g pojo UniqueIdKey.java
            persons.selectKey((PersonKey, Person) -> new UniqueIdKey(PersonKey.getId())).peek((key, value) -> System.out.println("Personkey1= " + key + ", PersonValue1= " + value))
                    .join(
                            schools.selectKey((SchoolKey, School) -> new UniqueIdKey(SchoolKey.getId())).peek((key, value) -> System.out.println("SchoolKey1= " + key + ", SchoolValue1= " + value)),
                            (person, school) -> {
                                System.out.println("person1= " + person + ", school1= " + school); //**This never gets called**
                                return null;
                            },
                            JoinWindows.of(Duration.ofSeconds(5)),
                            Joined.with(
                                    new UniqueIdKeySerde(),
                                    new PersonSerde(),
                                    new SchoolSerde())
            );
            //Messages with primitive keys e.g String
            persons.selectKey((PersonKey, Person) -> PersonKey.getId()).peek((key, value) -> System.out.println("Personkey2= " + key + ", PersonValue2= " + value))
                    .join(
                            schools.selectKey((SchoolKey, School) -> SchoolKey.getId()).peek((key, value) -> System.out.println("Schoolkey2= " + key + ", SchoolValue2= " + value)),
                            (person, school) -> {
                                System.out.println("person2= " + person + ", school2= " + school); //**This one works fine**
                                return null;
                            },
                            JoinWindows.of(Duration.ofSeconds(5)),
                            Joined.with(
                                    Serdes.Integer(),
                                    new PersonSerde(),
                                    new SchoolSerde())
                    );
            //Messages with composite-keys e.g pojo UniqueIdKey.java
            persons.selectKey((PersonKey, Person) -> new UniqueIdKey(PersonKey.getId())).peek((key, value) -> System.out.println("Personkey3= " + key + ", PersonValue3= " + value))
                    .join(
                            schools.selectKey((SchoolKey, School) -> new UniqueIdKey(SchoolKey.getId())).peek((key, value) -> System.out.println("SchoolKey3= " + key + ", SchoolValue3= " + value)),
                            new Joiner(),                           //**This never gets called**
                            JoinWindows.of(Duration.ofSeconds(5)),
                            Joined.with(
                                    new UniqueIdKeySerde(),
                                    new PersonSerde(),
                                    new SchoolSerde())

                    );
        }
    }
    interface KStreamProcessorX {

        @Input("person")
        KStream<?, ?> inputPersonKStream();

        @Input("school")
        KStream<?, ?> inputSchoolKStream();
    }
}

Joiner.java

public class Joiner implements ValueJoiner<Person, School, Null> {

    @Override
    public Null apply(Person person, School school) {
        System.out.println("Joiner person3= " + person + " ,Joiner school3= " + school);
        return null;
    }
}

Person.java

public class Person {

    private double age;

    public Person() {
    }

    public Person(double age) {
        this.age = age;
    }

    @JsonGetter("age")
    public double getAge() {
        return age;
    }

    @JsonSetter("age")
    public void setAge(double age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "Person{" +
                "age=" + age +
                '}';
    }
}

PersonKey.java

public class PersonKey {

    private String firstName;
    private String lastName;
    private int id;

    public PersonKey() {
    }

    public PersonKey(String firstName, String lastName, int id) {
        this.firstName = firstName;
        this.lastName = lastName;
        this.id = id;
    }

    @JsonGetter("firstName")
    public String getFirstName() {
        return firstName;
    }

    @JsonSetter("firstName")
    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    @JsonGetter("lastName")
    public String getLastName() {
        return lastName;
    }

    @JsonSetter("lastName")
    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @JsonGetter("id")
    public int getId() {
        return id;
    }

    @JsonSetter("id")
    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "PersonKey{" +
                "firstName='" + firstName + '\'' +
                ", lastName='" + lastName + '\'' +
                ", id=" + id +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        PersonKey personKey = (PersonKey) o;
        return id == personKey.id &&
                Objects.equals(firstName, personKey.firstName) &&
                Objects.equals(lastName, personKey.lastName);
    }

    @Override
    public int hashCode() {
        return Objects.hash(firstName, lastName, id);
    }
}

School.java

public class School {

    private String address;

    public School() {
    }

    public School(String address) {
        this.address = address;
    }

    @JsonGetter("address")
    public String getAddress() {
        return address;
    }

    @JsonSetter("address")
    public void setAddress(String address) {
        this.address = address;
    }

    @Override
    public String toString() {
        return "School{" +
                "address='" + address + '\'' +
                '}';
    }
}

SchoolKey.java

public class SchoolKey {

    private String name;
    private String country;
    private String city;
    private int id;

    public SchoolKey() {
    }

    public SchoolKey(String name, String country, String city, int id) {
        this.name = name;
        this.country = country;
        this.city = city;
        this.id = id;
    }

    @JsonGetter("name")
    public String getName() {
        return name;
    }

    @JsonSetter("name")
    public void setName(String name) {
        this.name = name;
    }

    @JsonGetter("country")
    public String getCountry() {
        return country;
    }

    @JsonSetter("country")
    public void setCountry(String country) {
        this.country = country;
    }

    @JsonGetter("city")
    public String getCity() {
        return city;
    }

    @JsonSetter("city")
    public void setCity(String city) {
        this.city = city;
    }

    @JsonGetter("id")
    public int getId() {
        return id;
    }

    @JsonSetter("id")
    public void setId(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "SchoolKey{" +
                "name='" + name + '\'' +
                ", country='" + country + '\'' +
                ", city='" + city + '\'' +
                ", id=" + id +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        SchoolKey schoolKey = (SchoolKey) o;
        return id == schoolKey.id &&
                Objects.equals(name, schoolKey.name) &&
                Objects.equals(country, schoolKey.country) &&
                Objects.equals(city, schoolKey.city);
    }

    @Override
    public int hashCode() {
        return Objects.hash(name, country, city, id);
    }
}

Оба KStreams снабжены данными по темам «персона» и «школа» соответственно.Как личные, так и школьные сообщения имеют один и тот же идентификатор, на основании которого выполняется внутреннее соединение.

person.topic

CreateTime:1559902106959-{"firstName":"JONH","lastName":"wICK","id":1}-{"age":34.0}
CreateTime:1559902106986-{"firstName":"Harley","lastName":"valla","id":2}-{"age":42.0}
CreateTime:1559902106991-{"firstName":"Mike","lastName":"PENCE","id":3}-{"age":23.0}
CreateTime:1559902106996-{"firstName":"Ali","lastName":"Akbar","id":4}-{"age":53.0}
CreateTime:1559902107000-{"firstName":"Arslan","lastName":"Akhtar","id":5}-{"age":53.0}
CreateTime:1559902107005-{"firstName":"Will","lastName":"David","id":6}-{"age":13.0}
CreateTime:1559902107009-{"firstName":"Beoionca","lastName":"Christ","id":7}-{"age":64.0}

school.topic

CreateTime:1559902107055-{"name":"BMIA","country":"PK","city":"Islamabad","id":1}-{"address":"Sector F/8"}
CreateTime:1559902107068-{"name":"CMII","country":"Hk","city":"Rawalpindi","id":2}-{"address":"Sector G/8"}
CreateTime:1559902107073-{"name":"SCSV","country":"USA","city":"Lahore","id":3}-{"address":"Sector H/8"}
CreateTime:1559902107079-{"name":"NVS","country":"SW","city":"Faisalbad","id":4}-{"address":"Sector J/8"}
CreateTime:1559902107082-{"name":"SNVJ","country":"CH","city":"Shikarpur","id":5}-{"address":"Sector C/8"}
CreateTime:1559902107088-{"name":"DBJ","country":"CN","city":"Talaqand","id":6}-{"address":"Sector Z/8"}
CreateTime:1559902107092-{"name":"SCNJ","country":"SE","city":"Karachi","id":7}-{"address":"Sector S/8"}

Результат вывода на консоль

Personkey1= UniqueIdKey{id=1}, PersonValue1= Person{age=34.0}
Personkey2= 1, PersonValue2= Person{age=34.0}
Personkey3= UniqueIdKey{id=1}, PersonValue3= Person{age=34.0}
SchoolKey1= UniqueIdKey{id=1}, SchoolValue1= School{address='Sector F/8'}
Schoolkey2= 1, SchoolValue2= School{address='Sector F/8'}
SchoolKey3= UniqueIdKey{id=1}, SchoolValue3= School{address='Sector F/8'}
Personkey1= UniqueIdKey{id=2}, PersonValue1= Person{age=42.0}
Personkey2= 2, PersonValue2= Person{age=42.0}
Personkey3= UniqueIdKey{id=2}, PersonValue3= Person{age=42.0}
SchoolKey1= UniqueIdKey{id=2}, SchoolValue1= School{address='Sector G/8'}
Schoolkey2= 2, SchoolValue2= School{address='Sector G/8'}
SchoolKey3= UniqueIdKey{id=2}, SchoolValue3= School{address='Sector G/8'}
Personkey1= UniqueIdKey{id=3}, PersonValue1= Person{age=23.0}
Personkey2= 3, PersonValue2= Person{age=23.0}
Personkey3= UniqueIdKey{id=3}, PersonValue3= Person{age=23.0}
SchoolKey1= UniqueIdKey{id=3}, SchoolValue1= School{address='Sector H/8'}
Schoolkey2= 3, SchoolValue2= School{address='Sector H/8'}
SchoolKey3= UniqueIdKey{id=3}, SchoolValue3= School{address='Sector H/8'}
Personkey1= UniqueIdKey{id=4}, PersonValue1= Person{age=53.0}
Personkey2= 4, PersonValue2= Person{age=53.0}
Personkey3= UniqueIdKey{id=4}, PersonValue3= Person{age=53.0}
SchoolKey1= UniqueIdKey{id=4}, SchoolValue1= School{address='Sector J/8'}
Schoolkey2= 4, SchoolValue2= School{address='Sector J/8'}
SchoolKey3= UniqueIdKey{id=4}, SchoolValue3= School{address='Sector J/8'}
Personkey1= UniqueIdKey{id=5}, PersonValue1= Person{age=53.0}
Personkey2= 5, PersonValue2= Person{age=53.0}
Personkey3= UniqueIdKey{id=5}, PersonValue3= Person{age=53.0}
SchoolKey1= UniqueIdKey{id=5}, SchoolValue1= School{address='Sector C/8'}
Schoolkey2= 5, SchoolValue2= School{address='Sector C/8'}
SchoolKey3= UniqueIdKey{id=5}, SchoolValue3= School{address='Sector C/8'}
Personkey1= UniqueIdKey{id=6}, PersonValue1= Person{age=13.0}
Personkey2= 6, PersonValue2= Person{age=13.0}
Personkey3= UniqueIdKey{id=6}, PersonValue3= Person{age=13.0}
SchoolKey1= UniqueIdKey{id=6}, SchoolValue1= School{address='Sector Z/8'}
Schoolkey2= 6, SchoolValue2= School{address='Sector Z/8'}
SchoolKey3= UniqueIdKey{id=6}, SchoolValue3= School{address='Sector Z/8'}
Personkey1= UniqueIdKey{id=7}, PersonValue1= Person{age=64.0}
Personkey2= 7, PersonValue2= Person{age=64.0}
Personkey3= UniqueIdKey{id=7}, PersonValue3= Person{age=64.0}
SchoolKey1= UniqueIdKey{id=7}, SchoolValue1= School{address='Sector S/8'}
Schoolkey2= 7, SchoolValue2= School{address='Sector S/8'}
SchoolKey3= UniqueIdKey{id=7}, SchoolValue3= School{address='Sector S/8'}
person2= Person{age=34.0}, school2= School{address='Sector F/8'}
person2= Person{age=42.0}, school2= School{address='Sector G/8'}
person2= Person{age=23.0}, school2= School{address='Sector H/8'}
person2= Person{age=53.0}, school2= School{address='Sector J/8'}
person2= Person{age=53.0}, school2= School{address='Sector C/8'}
person2= Person{age=13.0}, school2= School{address='Sector Z/8'}
person2= Person{age=64.0}, school2= School{address='Sector S/8'}

UniqueIdKeySerde.java

import kafka.streams.join.UniqueIdKey;
import org.apache.kafka.common.serialization.Serdes;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

public class UniqueIdKeySerde extends Serdes.WrapperSerde<UniqueIdKey> {
    public UniqueIdKeySerde () {
        super(new JsonSerializer<UniqueIdKey>(), new JsonDeserializer<UniqueIdKey>(UniqueIdKey.class));
    }
}

Пример spring-cloud-stream приложения с воспроизводимыми шагами для отладки

Ответы [ 2 ]

0 голосов
/ 10 июня 2019

Пример Serde:

public class StateProvinceKeySerde extends JsonSerde<StateProvinceKey> {

  public StateProvinceKeySerde() {
    super(StateProvinceKey.class);
  }
}

Пример ключа:

public class StateProvinceKey {

  private String stateCode;
  private String countryCodeAlpha2;

  public IBMStateProvinceKey() {

  }

  public StateProvinceKey(String stateCode, String countryCodeAlpha2) {
    this.stateCode = stateCode;
    this.countryCodeAlpha2 = countryCodeAlpha2;
  }

  public String getStateCode() {
    return stateCode;
  }

  public void setStateCode(String stateCode) {
    this.stateCode = stateCode;
  }

  public String getCountryCodeAlpha2() {
    return countryCodeAlpha2;
  }

  public void setCountryCodeAlpha2(String countryCodeAlpha2) {
    this.countryCodeAlpha2 = countryCodeAlpha2;
  }

  public byte[] serialize(){
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      return objectMapper.writeValueAsBytes(this);
    } catch (JsonProcessingException e) {
    }
    return new byte[0];
  }
}
0 голосов
/ 09 июня 2019

Когда Kafka Streams вычисляет агрегацию объединения, он не сравнивает Java объекты , когда сравнивает ключи, но сравнивает массивы ключей byte[], то есть сериализованные ключи.,Следовательно, equals() и hashCode не используются.

Вам необходимо убедиться, что используемый сериализатор записывает совпадающие массивы byte[] для ключей, чтобы объединение работало.

...