Не строковые заголовки kafka-сообщения всегда равны нулю.Заголовки сообщений типа String типа данных извлекаются правильно.
package com.test.KafkaSimpleProject;
import org.apache.camel.CamelContext;
import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
/**
* Hello world!
*
*/
public class App2 {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
try {
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
//producer route
from("timer:foo?period=3000&repeatCount=1000&delay=3000")
.setBody()
.constant("This is kafka message")
.setHeader("name")
.simple("ApacheCamelkafka", String.class)
.setHeader("contact")
.simple("123123", Integer.class)
.setHeader("trueFalse")
.simple("false", Boolean.class)
.to("kafka:test?brokers=localhost:9092");
//Consumer route
from("kafka:test?brokers=localhost:9092")
.process(new org.apache.camel.Processor() {
public void process(org.apache.camel.Exchange exchange)
throws Exception {
Message in = exchange.getIn();
String name = in .getHeader("name", String.class);
Integer contact = in .getHeader("contact", Integer.class);
Boolean trueFalse = in .getHeader("trueFalse", Boolean.class);
System.out.println("Name:" + name);
System.out.println("contact:" + contact);
System.out.println("trueFalse:" + trueFalse);
}
});
}
});
context.start();
Thread.sleep(20000000);
} finally {
context.stop();
}
}
}
Ожидаемый вывод:
Имя: ApacheCamelkafka
contact: 123123
trueFalse: false
Фактический вывод:
Имя: ApacheCamelkafka
контакт: ноль
trueFalse: ноль