Я впервые использую Apache Flink с AWS Kineses.По сути, моя цель - преобразовать входящие данные из потока Kinesis таким образом, чтобы я мог выполнять простые преобразования, такие как фильтрация и агрегация.
Я добавляю источник, используя нижеприведенное:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
В конце концов, когда я печатаю входящий поток, я получаю данные json, как и ожидалось:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
input.print();
Это пример результата печати:
{"event_num": "5530", "timestmap": "2019-03-04 14: 29: 44.882376", "amount": "80.4", "type": "Purchase"} {"event_num": "5531", "timestmap": "2019-03-04 14: 29: 44.881379", "amount": "11.98", "type": "Service"}
Может кто-нибудь объяснить мне, как я могу получить доступ к этим элементам jsonтаким образом, чтобы я мог выполнить простое преобразование, например, выбрав только записи, содержащие «Service» в качестве типа?