Потоки Кафки | Агрегация присоединенного потока - PullRequest
0 голосов
/ 22 апреля 2020

Я хочу объединить два потока topi c (левое соединение) и выполнить агрегирование на основе окон по объединенному потоку. Однако при объединении некоторые сообщения считаются дважды, так как при соединении некоторые сообщения отправляются дважды, в зависимости от задержки в правом верхнем углу c. Ниже приведен код для заказа на поставку C.

        StreamsBuilder builder = new StreamsBuilder();
        KStream<Long, BidMessage> bidStream = builder.stream("bid", Consumed.with(new LongSerde(), new BidMessageSerde()).withTimestampExtractor(new BidMessageTimestampExtractor()));
        KStream<Long, ClickMessage> clickStream = builder.stream("click", Consumed.with(new LongSerde(), new ClickMessageSerde()).withTimestampExtractor(new ClickMessageTimestampExtractor()));

        KStream<String, BidMessage> newBidStream = bidStream.selectKey((key, value) -> value.getRequestId());
        KStream<String, CLickMessage> newClickStream = impStream.selectKey((key, value) -> value.getRequestId());

        KStream<String, BidMergedMessage> result = newBidStream.leftJoin(newImpStream,
                getValueJoiner(),
                JoinWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(0)),
                Joined.with(Serdes.String(), new BidMessageSerde(), new ClickMessageSerde()));

        result.groupBy((key, value) -> "" + value.getClientId(), Grouped.with(Serdes.String(), newBidMergedSerde()))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(30)).grace(Duration.ofSeconds(40)))
                .aggregate(() -> new AggResult(0, 0), (key, value, aggregate) -> {
                    if (value.getClickId() != null) {
                        aggregate.clicks_++;
                    }
                    aggregate.bids_++;
                    return aggregate;
                }, Materialized.with(Serdes.String(),new AggResultJsonSerde()))
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream()
                .foreach((key, value) -> {
                    logger.info("{}-{}, clientId : {}, Value: {}", new Date(key.window().start()), new Date(key.window().end()),key.key(), value);
                });

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();

Можно ли исправить это, чтобы избежать дубликатов из-за объединения?

Ответы [ 2 ]

1 голос
/ 24 апреля 2020

Я не уверен, что полностью следую вашим требованиям, но, похоже, проблема в том, что вы хотите считать ставки только , если не было слияния с RHS или если соединение прошло успешно. Но из-за некоторой медлительности в RHS topi c вы иногда получаете два результата: сначала без слияния, а затем с последующим слиянием при поступлении записи RHS.

Вы можете добавить оператор TransformValues в result KStream и используйте хранилище состояний для отслеживания поступающих записей. Если у вас есть дубликат, полученный от успешного объединения, вы можете просмотреть хранилище состояний и удалить запись с нулевым RHS, если она существует, затем переслать правильный результат соединения.

Чтобы переслать записи, которые никогда не приводили к успешному объединению, вы можете рассмотреть возможность использования punctuate() для периодического go через хранилище и выдачу записей, которые не соответствуют и находились в хранилище состояний за пределами время, когда вы чувствуете, что соединение должно было произойти.

Этот учебник от Kafka Tutorials может также послужить руководством.

0 голосов
/ 22 апреля 2020

Я могу придумать два варианта.

  1. Первое, что нужно попробовать, это добавить filter в поток KStream<String, BidMergedMessage> result. Я предполагаю, что по объекту, возвращенному из ValueJoiner, можно сказать, что значение RHS равно нулю.

  2. Использовать внутреннее соединение newBidStream.join(newImpStream...

-Bill

...