Разбиение входящих записей по ключу поля JSON - PullRequest
0 голосов
/ 03 августа 2020

Новое в Apache Подмигивайте и играюсь, и я пытаюсь добиться разделения приемника с помощью ключа поля JSON.

Вот пример данных, которые вставляются в поток данных Kinesis:

{"user_id": 1337, "some_field": "data"}
{"user_id": 55, "some_field": "data"}

Я хочу, чтобы задание Apache Flink потребляло эти данные через поток данных Kinesis, а затем сохраняло их в S3, добавляя к ключу префикс со значением «user_id», например /user-1337/data-partition.json, где сохраняются только поля user_id.

Вот пример кода:

public class LogProcessingJob {
    private static final ObjectMapper jsonParser = new ObjectMapper();
    private static final String region = "us-east-1";
    private static final String inputStreamName = "testing-apache-flink";
    private static final String s3SinkPath = "s3a://testing-apache-flink/data";

    private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

        return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
    }

    private static StreamingFileSink<Tuple2> createS3SinkFromStaticConfig() {
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix") // HOW TO GET user_id here?
                .withPartSuffix(".json")
                .build();

        return StreamingFileSink
                .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<Tuple2>("UTF-8"))
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withOutputFileConfig(config)
                .build();
    }

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /* if you would like to use runtime configuration properties, uncomment the lines below
         * DataStream<String> input = createSourceFromApplicationProperties(env);
         */
        DataStream<String> input = createSourceFromStaticConfig(env);

        input.map(value -> { // Parse the JSON
            JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
            return new Tuple2(jsonNode.get("user_id").asInt(),
                    jsonNode.get("status").asText());
        }).returns(Types.TUPLE(Types.INT, Types.STRING))
                .keyBy(event -> event.f0) // partition by user_id
                .addSink(createS3SinkFromStaticConfig());

        env.execute("Process log files");
    }
}

Как получить user_id в OutputFileConfig или есть лучший способ для этого?

1 Ответ

1 голос
/ 07 августа 2020

Flink предоставляет интерфейс BucketAssigner, который позволяет вам указать Bucket каждый входящий элемент, который должен быть помещен.

Согласно документации,

/**
 * A BucketAssigner is used with a {@link StreamingFileSink} to determine the {@link Bucket} each incoming element
 * should be put into.
 *
 * <p>The {@code StreamingFileSink} can be writing to many buckets at a time, and it is responsible for managing
 * a set of active buckets. Whenever a new element arrives it will ask the {@code BucketAssigner} for the bucket the
 * element should fall in. The {@code BucketAssigner} can, for example, determine buckets based on system time.
 *
 * @param <IN> The type of input elements.
 * @param <BucketID> The type of the object returned by the {@link #getBucketId(Object, BucketAssigner.Context)}. This has to have
 *                  a correct {@link #hashCode()} and {@link #equals(Object)} method. In addition, the {@link Path}
 *                  to the created bucket will be the result of the {@link #toString()} of this method, appended to
 *                  the {@code basePath} specified in the {@link StreamingFileSink StreamingFileSink}.
 */

Итак, в В вашем случае вы можете реализовать пользовательский BucketAssigner как это

public class UserIdBucketAssigner<T extends Tuple2> implements BucketAssigner<T, String> {
  private Long serialVersionUID = 1L

  public String getBucketId(T element, BucketAssigner.Context context) {
    return element._1
  }

  public SimpleVersionedSerializer<String> getSerializer() {
    SimpleVersionedStringSerializer.INSTANCE
}

}

и указать его в конструкции StreamingFileSink:

private static StreamingFileSink<Tuple2> createS3SinkFromStaticConfig() {
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix") // HOW TO GET user_id here?
                .withPartSuffix(".json")
                .build();

        return StreamingFileSink
                .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<Tuple2>("UTF-8"))
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withBucketAssigner(new UserIdBucketAssigner<Tuple2>)
                .withOutputFileConfig(config)
                .build();
    }
...