Флинк десериализовать Кафка JSON - PullRequest
0 голосов
/ 02 мая 2020

Я пытаюсь прочитать сообщение json от kafka topi c с помощью flink.

Я использую Kafka 2.4.1 и Flink 1.10

для моего потребителя. установить:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;


FlinkKafkaConsumer<ObjectNode> sensorConsumer = new FlinkKafkaConsumer(KAFKA_TOPIC_INPUT, 
                new JSONKeyValueDeserializationSchema(false), properties);

когда я использую SimpleStringSchema, я получаю json в качестве текста, что хорошо, но с JSONKeyValueDeserializer я получаю:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sensor_5': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

sensor_5 будет введите topi c Я предполагаю, что мне нужно добавить что-то еще, чтобы получить JSON из значения сообщения kafka, переданного в сериализатор, и как-то обработать ключ, но я не уверен?

Любой предложения?

Структура json имеет вид:

{"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}

и передается через

# Python 3
import json
from confluent_kafka import Producer

dict_obj = {"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
producer = Producer({'bootstrap.servers': "kafka:9092"})

producer.produce(topic='sensors-raw', key='sensor_5', value=json.dumps(dict_obj))

Ответы [ 2 ]

1 голос
/ 02 мая 2020

Итак, в основном, если вы посмотрите на исходный код JSONKeyValueDeserializationSchema, вы увидите, что он выглядит следующим образом:

    if (mapper == null) {
            mapper = new ObjectMapper();
        }
        ObjectNode node = mapper.createObjectNode();
        if (record.key() != null) {
            node.set("key", mapper.readValue(record.key(), JsonNode.class));
        }
        if (record.value() != null) {
            node.set("value", mapper.readValue(record.value(), JsonNode.class));
        }
        if (includeMetadata) {
            node.putObject("metadata")
                .put("offset", record.offset())
                .put("topic", record.topic())
                .put("partition", record.partition());
        }
        return node;

Итак, обычно схема ожидает, что ваш ключ JSON не String, поэтому он потерпит неудачу для sensor_5. Я думаю, что лучшим и самым простым решением было бы создать собственную реализацию, в которой в качестве ключа используется String.

0 голосов
/ 03 мая 2020

Вы можете реализовать DeserializationSchema вместо KeyedDeserializationSchema, если не хотите включать свой ключ в свою запись.

Пример будет выглядеть следующим образом:

public class JSONValueDeserializationSchema implements DeserializationSchema<ObjectNode> {

    private static final long serialVersionUID = -1L;

    private ObjectMapper mapper;

    @Override
    public ObjectNode deserialize(byte[] message) throws IOException {
        if (mapper == null) {
            mapper = new ObjectMapper();
        }
        ObjectNode node = mapper.createObjectNode();
        if (message != null) {
            node.set("value", mapper.readValue(message, JsonNode.class));
        }
        return node;
    }

    @Override
    public boolean isEndOfStream(ObjectNode nextElement) {
        return false;
    }

    @Override
    public TypeInformation<ObjectNode> getProducedType() {
        return getForClass(ObjectNode.class);
    }
}

Если вы также хотите включить ключ в свою запись, вы можете реализовать KeyedDeserializationSchema, как упомянуто в ответе Доминика Восински .

...