Я записываю записи protobuf в наши сегменты s3.И я хочу использовать api набора данных flink для чтения из него.Поэтому я реализовал пользовательский FileInputFormat для достижения этой цели.Код как ниже.
public class ProtobufInputFormat extends FileInputFormat<StandardLog.Pageview> {
public ProtobufInputFormat() {
}
private transient boolean reachedEnd = false;
@Override
public boolean reachedEnd() throws IOException {
return reachedEnd;
}
@Override
public StandardLog.Pageview nextRecord(StandardLog.Pageview reuse) throws IOException {
StandardLog.Pageview pageview = StandardLog.Pageview.parseDelimitedFrom(stream);
if (pageview == null) {
reachedEnd = true;
}
return pageview;
}
@Override
public boolean supportsMultiPaths() {
return true;
}
}
public class BatchReadJob {
public static void main(String... args) throws Exception {
String readPath1 = args[0];
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ProtobufInputFormat inputFormat = new ProtobufInputFormat();
inputFormat.setNestedFileEnumeration(true);
inputFormat.setFilePaths(readPath1);
DataSet<StandardLog.Pageview> dataSource = env.createInput(inputFormat);
dataSource.map(new MapFunction<StandardLog.Pageview, String>() {
@Override
public String map(StandardLog.Pageview value) throws Exception {
return value.getId();
}
}).writeAsText("s3://xxx", FileSystem.WriteMode.OVERWRITE);
env.execute();
}
}
Проблема заключается в том, что flink всегда назначает один раздел файла на один слот параллелизма.Другими словами, он всегда обрабатывает то же количество файлов, что и число параллелизма.
Я хочу знать, как правильно реализовать пользовательский FileInputFormat.
Спасибо.