Используя DSL Kafka Streams, я десериализирую событие в bean-компонент, который затем помещаю в «EventCarrier».KStream обрабатывает событие.
EventCarrier позволяет мне записывать любые исключения, возникающие во время обработки.Я делю поток на потоки успеха и неудач на основе наличия или отсутствия исключения.
Модель работает хорошо, пока я не оставлю свой поток в KTable.В этот момент кажется, что Кафка отбрасывает EventCarrier и создает новый, снова десериализовав исходное событие.Любое исключение, которое было сохранено в исходном EventCarrier, потеряно.
Ожидается ли десериализация в этом сценарии, или я что-то не так делаю?
KStream<String, EventCarrier> notificationStream[] = myStream
.filter((key, carrier) -> shouldNotify(carrier.getEvent()))
.selectKey((key, carrier) -> carrier.getEvent().getGuid())
.peek((key, carrier) -> carrier.recordException(new Exception("Ouch!!")))
.peek((key, carrier) -> System.out.println("1:" + carrier.getException()))
.leftJoin(notificationsTable, (carrier, notification) ->
recordWhetherNotificationFound(carrier, notification))
.peek((key, carrier) -> System.out.println("2:" + carrier.getException()))
и метод
private ProductEventCarrier recordWhetherNotificationFound(ProductEventCarrier carrier, NotificationEvent notification) {
carrier.setNotificationFound(nonNull(notification));
return carrier;
}
В приведенном выше коде исключение печатается, как и ожидалось, в первом println, но во втором - пустое.