Преобразовать объектный узел в json узел - PullRequest
0 голосов
/ 24 января 2020

Здесь DataStream возвращает пару ключ-значение в виде объекта, мне нужно значение ключа напрямую, а не как объект, потому что мне нужно сгруппировать значения на основе ключа.

DataStream<ObjectNode> stream = env
    .addSource(new FlinkKafkaConsumer<>("test5", new JSONKeyValueDeserializationSchema (false), properties));

// stream.keyBy("record1").print();

когда я даю stream.keyby ("record1" ).Распечатать(); это показывает

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>) cannot be used as key.
    at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
    at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:337)
    at ReadFromKafka.main(ReadFromKafka.java:27)

Ответы [ 2 ]

0 голосов
/ 24 января 2020

Ответ Дэвида Андерсона является правильным, в дополнение, я могу добавить, что Вы можете просто создать KeySelector, который будет извлекать ключ как String. Это может выглядеть так:

public class JsonKeySelector implements KeySelector<ObjectNode, String> {
    @Override
    public String getKey(ObjectNode jsonNodes) throws Exception {
        return jsonNodes.get("key").asText();
    }
}

Это, очевидно, предполагает, что ключ должен быть String.

0 голосов
/ 24 января 2020

Существует несколько способов указать ключевой селектор в клавише FlinkBy. Например, если у вас есть POJO типа Event с ключом String в поле с именем «id», любой из них будет работать:

stream.keyBy("id")
stream.keyBy(event -> event.id)
stream.keyBy(
    new KeySelector<Event, String>() {
        @Override
        public String getKey(Event event) throws Exception {
            return event.id;
        }
    }
)

До тех пор, пока вы можете вычислить ключ от объекта в детерминированном порядке c, вы можете сделать эту работу.

...