У меня есть случай использования, когда данное сообщение, которое генерируется внешним клиентом, должно пройти несколько этапов обогащения и т. Д. Через потоки Kafka, прежде чем сообщение будет наконец передано другому целевому кластеру Kafka.У меня есть потребность в полной прослеживаемости реализации, а также согласовании.
Для моего варианта использования я пытаюсь использовать следующий подход:
- Используйте перехватчики-производители, чтобы ввести уникальное сообщение, которое я бы поместил в заголовок сообщения.
- Пустьпроход заголовка через поток обработки
- Используйте другой перехватчик производителя, прежде чем сообщение будет записано в целевой кластер (второй кластер).
У меня проблема в том, что я не могу связать RecordMetadata с идентификатором сообщения, который я установил в заголовке.RecordMetadata не содержит никаких проходов через сортировку полей для сообщения или корреляции и т. Д., В то время как я могу сопоставить счетчик, но я не могу сопоставить сообщение, которое я бы.
Есть ли другоеподход к решению этой проблемы?Есть ли в будущем KIP, где есть возможность сохранения сквозных полей в RecordMetadata.