Я проверял потоки Кафки. Я тестировал приведенный ниже код для потоков Kafka
Тема продюсера: (это первая тема продюсера, которая отправляет нижеприведенные данные json)
KafkaProducer<String, String> producer = new KafkaProducer<>(
properties);
producer.send(new ProducerRecord<String,String>(topic, jsonobject.toString()));
producer.close();
JSON - продюсер по теме:
{"UserID":"1","Address”:”XXX”,”AccountNo":"234234","MemberName”:”Stella”,”AccountType":"Savings"}
Потоковый код темы: (это второй потоковый код и тема)
builder.<String,String>stream(topic)
.filter(new Predicate <String, String>() {
@Override
public boolean test(String key, String value) {
// put you processor logic here
System.out.println("value : " + value);
return value.substring(0).equals(“1”);
}
})
.to(streamouttopic);
final KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
Я хочу подать, если значение UserID равно «1», а затем отправить эти данные в тему потоковой передачи.
Когда я использую «.filter» и печатаю System.out.println («значение:» + значение); он выдает следующую ошибку при выполнении.
Exception in thread "SampleStreamProducer-a6bb543e-bb92-48d0-8d9f-225046722d81-StreamThread-1" java.lang.ClassCastException: [B cannot be cast to java.lang.String
Если я не использую «.filter» и использую простой код, подобный этому, builder.stream(topic).to(streamouttopic);
, он работает нормально, но без фильтрации. Но мне нужно использовать этот фильтр.
Может кто-нибудь подсказать мне, как это исправить?