Фильтр Кафка Потоки - PullRequest
0 голосов
/ 09 мая 2018

Я проверял потоки Кафки. Я тестировал приведенный ниже код для потоков Kafka

Тема продюсера: (это первая тема продюсера, которая отправляет нижеприведенные данные json)

KafkaProducer<String, String> producer = new KafkaProducer<>(
                    properties);

    producer.send(new ProducerRecord<String,String>(topic, jsonobject.toString()));
                  producer.close();

JSON - продюсер по теме:

{"UserID":"1","Address”:”XXX”,”AccountNo":"234234","MemberName”:”Stella”,”AccountType":"Savings"}

Потоковый код темы: (это второй потоковый код и тема)

builder.<String,String>stream(topic)
           .filter(new Predicate <String, String>() {
               @Override

            public boolean test(String key, String value) {

                   // put you processor logic here
                   System.out.println("value : " + value);

                   return value.substring(0).equals(“1”);
               }
            }) 
           .to(streamouttopic);

         final KafkaStreams streams = new KafkaStreams(builder, props);
         final CountDownLatch latch = new CountDownLatch(1);

            // attach shutdown handler to catch control-c
            Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
                @Override
                public void run() {
                    streams.close();
                    latch.countDown();
                }
            });

            try {
                streams.start();
                latch.await();
            } catch (Throwable e) {
                System.exit(1);
            }
            System.exit(0);

Я хочу подать, если значение UserID равно «1», а затем отправить эти данные в тему потоковой передачи.

Когда я использую «.filter» и печатаю System.out.println («значение:» + значение); он выдает следующую ошибку при выполнении.

Exception in thread "SampleStreamProducer-a6bb543e-bb92-48d0-8d9f-225046722d81-StreamThread-1" java.lang.ClassCastException: [B cannot be cast to java.lang.String

Если я не использую «.filter» и использую простой код, подобный этому, builder.stream(topic).to(streamouttopic);, он работает нормально, но без фильтрации. Но мне нужно использовать этот фильтр.

Может кто-нибудь подсказать мне, как это исправить?

1 Ответ

0 голосов
/ 10 мая 2018

По умолчанию Kafka Streams предполагает тип данных <byte[],byte[]>, а byte[] нельзя преобразовать в String.

При чтении темы необходимо указать Serdes как KStream:

builder.<String,String>stream(topic, Consumed.with(Serdes.String(), Serdes.String())
       .filter(...)

Пожалуйста, ознакомьтесь с примерами и прочитайте документы:

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...