Поддержка заголовков потоков Kafka - PullRequest
0 голосов
/ 16 февраля 2019

В нашем приложении производитель отправляет разные типы данных, и может случиться так, что раздел может иметь разные объекты типа данных, так как мы не хотим разделять на основе типа данных.

В потоках kafka я пыталсяиспользовать заголовки.Producer добавляет заголовок в BytesObject и отправляет данные в kafka.

Например, заголовок, определенный тип данных (customObject).Теперь, основываясь на заголовке, я хочу разобрать десериализацию объекта BytesObject, полученного в потоках kafka, но я ограничен использованием processorInterface, где мне нужно передать фактический десериализатор

Есть ли способ, которым мне не нужно предварительно указывать десериализациюзатем на основе заголовка в ProcessContext для полученной записи я могу десериализовать объекты

public class StreamHeaderProcessor extends AbstractProcessor<String, Bytes>{


@Override
public void process(String key, Bytes value) {
    Iterator<Header> it = context().headers().iterator();
    while(it.hasNext()) {
        Header head = it.next();
        if(head.key().equals("dataType")) {
            String headerValue = new String(head.value());
            if(headerValue.equals("X")) {

            }else if(headerValue.equals("Y")) {

            }
        }
    }
}

}

1 Ответ

0 голосов
/ 17 февраля 2019

Если вы не задаете Serdes в StreamsConfig и не устанавливаете Serdes в builder.stream(..., Consumed.with(/*Serdes*/)) Kafka Streams будет использовать ByteArraySerde по умолчанию и, таким образом, ключ и значение копируются в массивы byte[] в качестве типов данных.(Аналогично для использования Processor API и не устанавливайте Serde на topology.addSource(...).)

Таким образом, вы можете применить Processor или Transformer к потоку данных, проверить заголовок и вызватьсоответствующий десериализатор в вашем собственном коде.Вам необходимо заранее знать все возможные типы данных.

public class MyProcessor implements Processor {
    // add corresponding deserializers for all expected types (eg, String)
    private StringDeserializer stringDeserializer = new StringDeserializer();

    // other methods omitted

    void process(byte[] key, byte[] value) {
        // inspect header
        if (header.equals("StringType") {
            // get `context` via `init()` method
            String stringValue = stringDeserializer.deserialize(context.topic(), value);
            // similar for `key`

            // apply processing logic for String type
        }
    }

}
...