Поведение десериализации на KStream-KTable left Join - PullRequest
0 голосов
/ 17 мая 2019

Используя DSL Kafka Streams, я десериализирую событие в bean-компонент, который затем помещаю в «EventCarrier».KStream обрабатывает событие.

EventCarrier позволяет мне записывать любые исключения, возникающие во время обработки.Я делю поток на потоки успеха и неудач на основе наличия или отсутствия исключения.

Модель работает хорошо, пока я не оставлю свой поток в KTable.В этот момент кажется, что Кафка отбрасывает EventCarrier и создает новый, снова десериализовав исходное событие.Любое исключение, которое было сохранено в исходном EventCarrier, потеряно.

Ожидается ли десериализация в этом сценарии, или я что-то не так делаю?

 KStream<String, EventCarrier> notificationStream[] = myStream
    .filter((key, carrier) -> shouldNotify(carrier.getEvent()))
    .selectKey((key, carrier) -> carrier.getEvent().getGuid())
    .peek((key, carrier) -> carrier.recordException(new Exception("Ouch!!")))
    .peek((key, carrier) -> System.out.println("1:" + carrier.getException()))
    .leftJoin(notificationsTable, (carrier, notification) -> 
         recordWhetherNotificationFound(carrier, notification))
    .peek((key, carrier) -> System.out.println("2:" + carrier.getException()))

и метод

    private ProductEventCarrier recordWhetherNotificationFound(ProductEventCarrier carrier, NotificationEvent notification) {
            carrier.setNotificationFound(nonNull(notification));
        return carrier;
    }

В приведенном выше коде исключение печатается, как и ожидалось, в первом println, но во втором - пустое.

...