Новое в Apache Подмигивайте и играюсь, и я пытаюсь добиться разделения приемника с помощью ключа поля JSON.
Вот пример данных, которые вставляются в поток данных Kinesis:
{"user_id": 1337, "some_field": "data"}
{"user_id": 55, "some_field": "data"}
Я хочу, чтобы задание Apache Flink потребляло эти данные через поток данных Kinesis, а затем сохраняло их в S3, добавляя к ключу префикс со значением «user_id», например /user-1337/data-partition.json
, где сохраняются только поля user_id.
Вот пример кода:
public class LogProcessingJob {
private static final ObjectMapper jsonParser = new ObjectMapper();
private static final String region = "us-east-1";
private static final String inputStreamName = "testing-apache-flink";
private static final String s3SinkPath = "s3a://testing-apache-flink/data";
private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
}
private static StreamingFileSink<Tuple2> createS3SinkFromStaticConfig() {
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix") // HOW TO GET user_id here?
.withPartSuffix(".json")
.build();
return StreamingFileSink
.forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<Tuple2>("UTF-8"))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build();
}
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/* if you would like to use runtime configuration properties, uncomment the lines below
* DataStream<String> input = createSourceFromApplicationProperties(env);
*/
DataStream<String> input = createSourceFromStaticConfig(env);
input.map(value -> { // Parse the JSON
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
return new Tuple2(jsonNode.get("user_id").asInt(),
jsonNode.get("status").asText());
}).returns(Types.TUPLE(Types.INT, Types.STRING))
.keyBy(event -> event.f0) // partition by user_id
.addSink(createS3SinkFromStaticConfig());
env.execute("Process log files");
}
}
Как получить user_id в OutputFileConfig или есть лучший способ для этого?