Анализ json из входящего потока данных для выполнения простых преобразований в Flink - PullRequest
0 голосов
/ 04 марта 2019

Я впервые использую Apache Flink с AWS Kineses.По сути, моя цель - преобразовать входящие данные из потока Kinesis таким образом, чтобы я мог выполнять простые преобразования, такие как фильтрация и агрегация.

Я добавляю источник, используя нижеприведенное:

return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

В конце концов, когда я печатаю входящий поток, я получаю данные json, как и ожидалось:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
input.print();

Это пример результата печати:

{"event_num": "5530", "timestmap": "2019-03-04 14: 29: 44.882376", "amount": "80.4", "type": "Purchase"} {"event_num": "5531", "timestmap": "2019-03-04 14: 29: 44.881379", "amount": "11.98", "type": "Service"}

Может кто-нибудь объяснить мне, как я могу получить доступ к этим элементам jsonтаким образом, чтобы я мог выполнить простое преобразование, например, выбрав только записи, содержащие «Service» в качестве типа?

1 Ответ

0 голосов
/ 04 марта 2019

Поскольку вы используете SimpleStringSchema, результирующий поток событий имеет тип String.Поэтому вам нужно сначала проанализировать строку, а затем применить фильтры и т. Д.

Возможно, вы захотите взглянуть на JsonNodeDeserializationSchema , которая выдаст ObjectNode.

...