KStreams - Агрегирование и groupByKey гарантируют заказ? - PullRequest
0 голосов
/ 06 октября 2019

Представьте себе ситуацию, когда у меня есть тема со многими статусами, каждый в другом журнале с тем же ключом. Порядок этих статусов известен и всегда одинаков, например: создать, оплатить, доставить и т. Д.

Таким образом, если заказ такой же и известен (и сначала создается статус) в коде ниже profileEvent.parcelEvent всегда долженсуществовать. Но этого не произошло, я получаю исключение нулевого указателя в другом, если (оплачено) или еще, если (доставлено) раздел. Это доказательство того, что созданный раздел не выполняется первым.

Вопрос в том, почему это происходит при условии правильного порядка сообщений?

                .groupByKey()
            .aggregate(
                    CustomerProfileEvent::new,
                    (key, value, profileEvent) -> {
                        if (created) {
                            Parcel parcel = value.getParcel();

                            return CustomerProfileEvent.newBuilder()
                                    .setEventTimestamp(value.getHeader().getTimestamp())
                                    .setParcelEvent(CustomerProfileParcelEvent.newBuilder()
                                            .setEventName(parcelCreated)
                                            .setParcelCode(parcel.getParcelCode().toString())
                                            .build())
                                    .build();
                        }else if(paid){
                            do some action on profileEvent.parcelEvent
                        }else if(deliverd){
                            do some action on profileEvent.parcelEvent
                        }

                        return profileEvent;
                    },
                    Materialized.as(Stores.persistentKeyValueStore(applicationConfig.getParcelEventAggregationTopicName()))
                            .withKeySerde((Serde) Serdes.String())
                            .withValueSerde(avroSerdeHelper.createCustomerProfileEventSerde())
                            .withCachingDisabled()
            );
...