Пользовательский FileInputFormat всегда назначает один файловый раздел на один слот - PullRequest
0 голосов
/ 15 февраля 2019

Я записываю записи protobuf в наши сегменты s3.И я хочу использовать api набора данных flink для чтения из него.Поэтому я реализовал пользовательский FileInputFormat для достижения этой цели.Код как ниже.

public class ProtobufInputFormat extends FileInputFormat<StandardLog.Pageview> {
    public ProtobufInputFormat() {
    }

    private transient boolean reachedEnd = false;

    @Override
    public boolean reachedEnd() throws IOException {
        return reachedEnd;
    }

    @Override
    public StandardLog.Pageview nextRecord(StandardLog.Pageview reuse) throws IOException {
        StandardLog.Pageview pageview = StandardLog.Pageview.parseDelimitedFrom(stream);
        if (pageview == null) {
            reachedEnd = true;
        }
        return pageview;
    }

    @Override
    public boolean supportsMultiPaths() {
        return true;
    }
}


public class BatchReadJob {

    public static void main(String... args) throws Exception {

        String readPath1 = args[0];

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


        ProtobufInputFormat inputFormat =  new ProtobufInputFormat();
        inputFormat.setNestedFileEnumeration(true);

        inputFormat.setFilePaths(readPath1);

        DataSet<StandardLog.Pageview> dataSource = env.createInput(inputFormat);

        dataSource.map(new MapFunction<StandardLog.Pageview, String>() {
            @Override
            public String map(StandardLog.Pageview value) throws Exception {
                return value.getId();
            }
        }).writeAsText("s3://xxx", FileSystem.WriteMode.OVERWRITE);
        env.execute();

    }

}

Проблема заключается в том, что flink всегда назначает один раздел файла на один слот параллелизма.Другими словами, он всегда обрабатывает то же количество файлов, что и число параллелизма.

Я хочу знать, как правильно реализовать пользовательский FileInputFormat.

Спасибо.

1 Ответ

0 голосов
/ 15 февраля 2019

Я считаю, что поведение, которое вы видите, связано с тем, что ExecutionJobVertex вызывает метод FileInputFormat. createInputSplits() с параметром minNumSplits, равным вершинному (источнику данных) параллелизму.Поэтому, если вам нужно другое поведение, вам придется переопределить метод createInputSplits.

Хотя вы не сказали, какое поведение вы на самом деле хотели.Например, если вам нужно только одно разбиение на файл, вы можете переопределить метод testForUnsplittable() в своем подклассе FileInputFormat, чтобы всегда возвращать true;также следует установить для (защищенного) логического значения unsplittable значение true.

...