В нашем приложении производитель отправляет разные типы данных, и может случиться так, что раздел может иметь разные объекты типа данных, так как мы не хотим разделять на основе типа данных.
В потоках kafka я пыталсяиспользовать заголовки.Producer добавляет заголовок в BytesObject и отправляет данные в kafka.
Например, заголовок, определенный тип данных (customObject).Теперь, основываясь на заголовке, я хочу разобрать десериализацию объекта BytesObject, полученного в потоках kafka, но я ограничен использованием processorInterface, где мне нужно передать фактический десериализатор
Есть ли способ, которым мне не нужно предварительно указывать десериализациюзатем на основе заголовка в ProcessContext для полученной записи я могу десериализовать объекты
public class StreamHeaderProcessor extends AbstractProcessor<String, Bytes>{
@Override
public void process(String key, Bytes value) {
Iterator<Header> it = context().headers().iterator();
while(it.hasNext()) {
Header head = it.next();
if(head.key().equals("dataType")) {
String headerValue = new String(head.value());
if(headerValue.equals("X")) {
}else if(headerValue.equals("Y")) {
}
}
}
}
}