Кафка Потоки, как получить заголовки Кафки - PullRequest
1 голос
/ 17 апреля 2020

У меня ниже код потока kafka

    public class KafkaStreamHandler implements  Processor<String, String>{

    private ProcessorContext context;


        @Override
    public void init(ProcessorContext context) {
        // TODO Auto-generated method stub
        this.context = context;
    }

    public KeyValue<String, KafkaStatusRecordWrapper> process(String key, String value) {

        Headers contexts = context.headers();

        contexts.forEach(header -> System.out.println(header));
     }

public void StartFailstreamHandler() {
       StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> userStream = builder.stream("usertopic",Consumed.with(Serdes.String(), Serdes.String()));
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "failed-streams-userstream");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ALL my bootstrap servers);
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "500");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        //consumer_timeout_ms
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);

        props.put("state.dir","/tmp/kafka/stat));

     userStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));

     /* take few descsion based on Header */
     /* How to get the Header */ 

       userStream.map(this::process);
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);


kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {

                logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
            }
        });


        kafkaStreams.cleanUp();
        kafkaStreams.start();
    }

    }

А теперь наш клиент отправляет информацию о версии заголовков kafka, как показано ниже.

ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic", 1, "message");
record.headers().add(new RecordHeader("version", "v1".getBytes()));
producer.send(record);

На основании этого заголовка мне нужно выбрать парсер для моего сообщения. Как читать этот заголовок с помощью оператора KStream? Я видел все API потока, но ни один метод не дает заголовок

Я не могу перейти на обычного потребителя kakfa, так как мое приложение уже зависит от нескольких API KStream ..

1 Ответ

2 голосов
/ 17 апреля 2020

Процессор не позволяет вам связывать новый оператор в нисходящем DSL, вы должны использовать transformValues, чтобы использование могло продолжать использовать Stream DSL:

  1. Сначала извлечь заголовки из ValueTransformerWithKey
public class ExtractHeaderThenDoSomethingTransformer implements ValueTransformerWithKey<String, String, String> {

    ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public String transform(String readOnlyKey, String value) {
        Headers headers = context.headers();
        /* take few descsion based on Header: if you want to filter base on then just return null then chaining another filter operator after transformValues*/
        /* How to get the Header */
        return value;
    }

    @Override
    public void close() {

    }
}
Добавьте ExtractHeaderThenDoSomethingTransformer в свою топологию следующим образом:
userStream
        .transformValues(ExtractHeaderThenDoSomethingTransformer::new)
        .map(this::processs);
...