Обновить KTable на основе частичных атрибутов данных - PullRequest
1 голос
/ 20 ноября 2019

Я пытаюсь обновить KTable частичными данными объекта. Например. Пользовательский объект {"id":1, "name":"Joe", "age":28} Объект передается в тему и группируется по ключу в KTable. Теперь пользовательский объект частично обновляется следующим образом {"id":1, "age":33} и передается в таблицу. Но обновленная таблица выглядит следующим образом {"id":1, "name":null, "age":28}. Ожидаемый выход {"id":1, "name":"Joe", "age":33}. Как я могу использовать потоки Кафки и весенние облачные потоки для достижения ожидаемого результата. Мы ценим любые предложения. . Спасибо

1006 * Вот код
 @Bean
        public Function<KStream<String, User>, KStream<String, User>> process() {
            return input -> input.map((key, user) -> new KeyValue<String, User>(user.getId(), user))
                    .groupByKey(Grouped.with(Serdes.String(), new JsonSerde<>(User.class))).reduce((user1, user2) -> {
                        user1.merge(user2);
                        return user1;
                    }, Materialized.as("allusers")).toStream();
        }

и модифицированный объект пользователя с ниже код:

    public void merge(Object newObject) {
        assert this.getClass().getName().equals(newObject.getClass().getName());
        for (Field field : this.getClass().getDeclaredFields()) {
            for (Field newField : newObject.getClass().getDeclaredFields()) {
                if (field.getName().equals(newField.getName())) {
                    try {
                        field.set(this, newField.get(newObject) == null ? field.get(this) : newField.get(newObject));
                    } catch (IllegalAccessException ignore) {
                    }
                }
            }
        }
    }

Является ли это правильный подход, или любой другой подход в KStreams

Ответы [ 2 ]

2 голосов
/ 22 ноября 2019

Я протестировал ваш код слияния, и он, кажется, работает как положено. Но так как ваш результат после reduce равен {"id":1, "name":null, "age":28}, я могу думать о двух вещах:

  • Ваше состояние вообще не обновляется, поскольку ни один атрибут не изменился.
  • Может быть, у вас есть проблема с сериализацией, поскольку строковый атрибут имеет значение null, но другие атрибуты int хороши.

Я предполагаю, что это так, потому что вы изменяете исходный объект и возвращаете то же значениеПотоки kafka не распознают это как изменение и не сохранят новое состояние. На самом деле, вы не должны мутировать свой объект, так как это может привести к недетерминированности в зависимости от вашего конвейера.

Попробуйте изменить свою функцию merge, чтобы создать новый объект User, и посмотрите,изменения поведения.

0 голосов
/ 24 ноября 2019

Итак, вот рекомендуемый общий подход для объединения двух объектов, пожалуйста, прокомментируйте здесь. Чтобы это работало, объединяемый объект должен иметь пустой конструктор.

     public <T> T mergeObjects(T first, T second) {
        Class<?> clazz = first.getClass();
        Field[] fields = clazz.getDeclaredFields();
        Object newObject = null;
        try {
            newObject = clazz.getDeclaredConstructor().newInstance();
            for (Field field : fields) {
                field.setAccessible(true);
                Object value1 = field.get(first);
                Object value2 = field.get(second);
                Object value = (value2 == null) ? value1 : value2;
                field.set(newObject, value);
            }
        } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
                | InvocationTargetException | NoSuchMethodException | SecurityException e) {

            e.printStackTrace();
        }
        return (T) newObject;
    }
...