Как захватить записи kafka, которые не соответствуют условию объединения потока kafka? - PullRequest
0 голосов
/ 20 июня 2019

Я занимаюсь обогащением данных, присоединив kstream к ktable.Kstream содержит сообщения, отправленные транспортными средствами, а ktable содержит данные транспортных средств.У меня проблема в том, что я хочу захватить сообщение из потока, у которого нет соответствующего ключа соединения в таблице.Поток Kafka молча пропускает записи о том, что у них нет совпадений.Есть ли способ отправить эти записи в другую тему, чтобы их можно было обработать позже?

StreamsBuilder builder = new StreamsBuilder();
        final KTable<String, VinMappingInfo> vinMappingTable = builder.table(vinInfoTopic, Consumed.with(Serdes.String(), valueSerde));
        KStream<String, VehicleMessage> vehicleStream = builder.stream(sourceTopic);
        vehicleStream.join(vinMappingTable, (vehicleMsg, vinInfo) -> {
            log.info("joining {} with vin info {}", vehicleMsg.getPayload().getId(), vinInfo.data.vin);
            vehicleMsg.setVin(vinInfo.data.vin);
            return vehicleMsg;
        }, Joined.with(null, null, valueSerde))
                .to(destinationTopic);

        final Topology topology = builder.build();
        log.info("The topology of connected processor nodes: \n {}", topology.describe());
        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.cleanUp();
        streams.start();

1 Ответ

0 голосов
/ 25 июня 2019

Вы можете использовать левое соединение:

stream.leftJoin(table,...);

Это гарантирует, что все записи из входного потока находятся в выходном потоке.В этом случае ValueJoiner будет вызываться с apply(streamValue, null).

...