- Каждое сообщение должно быть обработано и отправлено перед обработкой следующего сообщения с помощью функции потока и без использования другого производителя кафки
Это произойдет в любом случае по умолчанию.
Если требование одного будет выполнено, тогда я смогу отправлять сообщения по темам, которые будут определяться динамически в соответствии с типом.
Во-первых, чтобы упростить этап обработки. события в зависимости от их типа, посмотрите на branch()
. Функция branch()
позволяет предоставлять фиксированное количество предикатов для маршрутизации сообщений в различные подпотоки. Затем вы можете независимо обрабатывать эти подпотоки, например, с помощью функции map()
. Наконец, вы можете затем отправить каждый подпоток в отдельный topi c с помощью to()
.
KStream<String, Event>[] branches = events.branch(
(id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
(id, event) -> event.getTransactionValue() < FRAUD_LIMIT);
branches[0].map(...).to(suspiciousTransactionsTopicName);
branches[1].map(...).to(validatedTransactionsTopicName);
Вы также можете принимать действительно динамические c решения о маршрутизации в to()
на основе что бы ни было в полезной нагрузке события. Здесь имя выходных данных topi c получено из данных события.
myStream.to(
(eventId, event, record) -> "topic-prefix-" + event.methodOfYourEventLikeGetTypeName()
);
Кроме того, если для решения о маршрутизации dynamici c требуется информация, которая недоступна непосредственно в событии, вы можете воспользоваться одним из вариантов. Нужно динамически обогатить исходное событие информацией, связанной с маршрутизацией (например, путем объединения потока исходных событий с таблицей с информацией, связанной с маршрутизацией), а затем выполнить динамическую c маршрутизацию через to()
. Подробнее см. https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/.