Я пытаюсь прочитать сообщение json от kafka topi c с помощью flink.
Я использую Kafka 2.4.1 и Flink 1.10
для моего потребителя. установить:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
FlinkKafkaConsumer<ObjectNode> sensorConsumer = new FlinkKafkaConsumer(KAFKA_TOPIC_INPUT,
new JSONKeyValueDeserializationSchema(false), properties);
когда я использую SimpleStringSchema
, я получаю json в качестве текста, что хорошо, но с JSONKeyValueDeserializer я получаю:
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sensor_5': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
sensor_5
будет введите topi c Я предполагаю, что мне нужно добавить что-то еще, чтобы получить JSON из значения сообщения kafka, переданного в сериализатор, и как-то обработать ключ, но я не уверен?
Любой предложения?
Структура json имеет вид:
{"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
и передается через
# Python 3
import json
from confluent_kafka import Producer
dict_obj = {"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
producer = Producer({'bootstrap.servers': "kafka:9092"})
producer.produce(topic='sensors-raw', key='sensor_5', value=json.dumps(dict_obj))