Не удалось получить сообщение JSON от Kafka в IBM InfoSphere Streams - PullRequest
0 голосов
/ 24 мая 2019

Я использовал следующий код согласно this .Однако, не удалось заставить это работать.

namespace com.ibm.streamsx.messaging.sample.kafka ;

use com.ibm.streamsx.messaging.kafka::* ;
use com.ibm.streamsx.json::* ;

composite KafkaSample
{
    param
        expression<rstring> $topic : "sampleTopic" ;
    type 
        /**
        * the `message` term is necessary for Kafka input.
        */
        KafkaData = rstring key, rstring message;
        JsonType = rstring Timestamp, rstring level, rstring thread, rstring classes, rstring messages, rstring stacktrace ;
    graph
        stream<KafkaData> KafkaStream = KafkaConsumer()
        {
            param
                propertiesFile : "etc/consumer.properties" ;
                topic : "nifi-log-batch" ;
        }
        // () as File_SInk = FileSink(KafkaStream)
        // {
        //  param
        //      file : "/opt/ibm/InfoSphere_Streams/data/message_output.log" ;
        // }
        /* Convert the Tweet tuple to a JSON string */
        // stream<rstring jsonMessage> ConvertTupleToJson = TupleToJSON(KafkaStream)
        // {
        // }

        stream<JsonType> ParsedJson = JSONToTuple(KafkaStream)
        {
            param
                inputAttribute: message; //specify which attribute contains the json
        }

        () as SinkOp = Custom(ParsedJson)
        {
            logic
                onTuple ParsedJson :
                {
                    printStringLn("Message: " + (rstring) ParsedJson) ;
                }
        }

        () as File_SInk = FileSink(ParsedJson)
        {
            param
                file : "/opt/ibm/InfoSphere_Streams/data/kafka_output.log" ;
                quoteStrings: false;
        }
}

Я получаю следующую ошибку:

24 May 2019 05:13:18.978-0500 [11322] ERROR #splapptrc,J[0],P[0],KafkaStream M[?:com.ibm.streamsx.messaging.kafka.KafkaSource.produceTuples:-1]  - Failed to make progress reading messages at nifi-log-batch-0=35. Received a non-empty fetch response from the server, but no complete records were found.
org.apache.kafka.common.KafkaException: Failed to make progress reading messages at nifi-log-batch-0=35. Received a non-empty fetch response from the server, but no complete records were found.
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:813)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:482)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1062)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:996)
    at com.ibm.streamsx.messaging.kafka.KafkaConsumerClient.getRecords(KafkaConsumerClient.java:94)
    at com.ibm.streamsx.messaging.kafka.KafkaSource.produceTuples(KafkaSource.java:290)
    at com.ibm.streamsx.messaging.kafka.KafkaSource$1.run(KafkaSource.java:256)
    at java.lang.Thread.run(Thread.java:811)
    at com.ibm.streams.operator.internal.runtime.OperatorThreadFactory$2.run(OperatorThreadFactory.java:137)

Я пробовал также с TupleToJSON , без улучшений.Что мне здесь не хватает?
Мой ключ схемы JSON и ключ типа данных не совпадают.Это проблема?Схема JSON:

{"src" : "helloworld.jpg", "width" : 500, "height" : 500, "alignment" : "center", "class": "first", "timestamp": "10th July"}

Используемый тип SPL:

type Img = rstring src, int64 width, rstring alignment, int64 height, rstring ts, rstring cls;
...