Я использовал следующий код согласно this .Однако, не удалось заставить это работать.
namespace com.ibm.streamsx.messaging.sample.kafka ;
use com.ibm.streamsx.messaging.kafka::* ;
use com.ibm.streamsx.json::* ;
composite KafkaSample
{
param
expression<rstring> $topic : "sampleTopic" ;
type
/**
* the `message` term is necessary for Kafka input.
*/
KafkaData = rstring key, rstring message;
JsonType = rstring Timestamp, rstring level, rstring thread, rstring classes, rstring messages, rstring stacktrace ;
graph
stream<KafkaData> KafkaStream = KafkaConsumer()
{
param
propertiesFile : "etc/consumer.properties" ;
topic : "nifi-log-batch" ;
}
// () as File_SInk = FileSink(KafkaStream)
// {
// param
// file : "/opt/ibm/InfoSphere_Streams/data/message_output.log" ;
// }
/* Convert the Tweet tuple to a JSON string */
// stream<rstring jsonMessage> ConvertTupleToJson = TupleToJSON(KafkaStream)
// {
// }
stream<JsonType> ParsedJson = JSONToTuple(KafkaStream)
{
param
inputAttribute: message; //specify which attribute contains the json
}
() as SinkOp = Custom(ParsedJson)
{
logic
onTuple ParsedJson :
{
printStringLn("Message: " + (rstring) ParsedJson) ;
}
}
() as File_SInk = FileSink(ParsedJson)
{
param
file : "/opt/ibm/InfoSphere_Streams/data/kafka_output.log" ;
quoteStrings: false;
}
}
Я получаю следующую ошибку:
24 May 2019 05:13:18.978-0500 [11322] ERROR #splapptrc,J[0],P[0],KafkaStream M[?:com.ibm.streamsx.messaging.kafka.KafkaSource.produceTuples:-1] - Failed to make progress reading messages at nifi-log-batch-0=35. Received a non-empty fetch response from the server, but no complete records were found.
org.apache.kafka.common.KafkaException: Failed to make progress reading messages at nifi-log-batch-0=35. Received a non-empty fetch response from the server, but no complete records were found.
at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:813)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:482)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1062)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:996)
at com.ibm.streamsx.messaging.kafka.KafkaConsumerClient.getRecords(KafkaConsumerClient.java:94)
at com.ibm.streamsx.messaging.kafka.KafkaSource.produceTuples(KafkaSource.java:290)
at com.ibm.streamsx.messaging.kafka.KafkaSource$1.run(KafkaSource.java:256)
at java.lang.Thread.run(Thread.java:811)
at com.ibm.streams.operator.internal.runtime.OperatorThreadFactory$2.run(OperatorThreadFactory.java:137)
Я пробовал также с TupleToJSON , без улучшений.Что мне здесь не хватает?
Мой ключ схемы JSON и ключ типа данных не совпадают.Это проблема?Схема JSON:
{"src" : "helloworld.jpg", "width" : 500, "height" : 500, "alignment" : "center", "class": "first", "timestamp": "10th July"}
Используемый тип SPL:
type Img = rstring src, int64 width, rstring alignment, int64 height, rstring ts, rstring cls;