У меня есть коллекция, которая представляет поток данных и тестирует StreamingFileSink для записи потока в S3. Программа успешно запущена, но в указанном пути S3 нет данных.
public class S3Sink {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.enableCheckpointing(100);
List<String> input = new ArrayList<>();
input.add("test");
DataStream<String> inputStream = see.fromCollection(input);
RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();
StreamingFileSink s3Sink = StreamingFileSink.
forRowFormat(new Path("<S3 Path>"),
new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(rollingPolicy)
.build();
inputStream.addSink(s3Sink);
see.execute();
}
}
Контрольная точка также включена. Есть какие-нибудь мысли о том, почему Sink не работает должным образом?
ОБНОВЛЕНИЕ: На основании ответа Дэвида создан пользовательский источник, который непрерывно генерирует случайную строку, и я ожидаю, что контрольная точка сработает через заданный интервал для записи данных в S3.
public class S3SinkCustom {
public static void main(String args[]) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.enableCheckpointing(1000);
DataStream<String> inputStream = see.addSource(new CustomSource());
RollingPolicy<Object, String> rollingPolicy = new CustomRollingPolicy();
StreamingFileSink s3Sink = StreamingFileSink.
forRowFormat(new Path("s3://mybucket/data/"),
new SimpleStringEncoder<>("UTF-8"))
.build();
//inputStream.print();
inputStream.addSink(s3Sink);
see.execute();
}
static class CustomSource extends RichSourceFunction<String> {
private volatile boolean running = false;
final String[] strings = {"ABC", "XYZ", "DEF"};
@Override
public void open(Configuration parameters){
running = true;
}
@Override
public void run(SourceContext sourceContext) throws Exception {
while (running) {
Random random = new Random();
int index = random.nextInt(strings.length);
sourceContext.collect(strings[index]);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
}
Тем не менее, в s3 нет данных, и Flink Process даже не проверяет, является ли корзина S3 действительной или нет, но процесс запущен без проблем.
Обновление:
Ниже приведены подробные сведения о пользовательской скользящей политике:
public class CustomRollingPolicy implements RollingPolicy<Object, String> {
@Override
public boolean shouldRollOnCheckpoint(PartFileInfo partFileInfo) throws IOException {
return partFileInfo.getSize() > 1;
}
@Override
public boolean shouldRollOnEvent(PartFileInfo partFileInfo, Object o) throws IOException {
return true;
}
@Override
public boolean shouldRollOnProcessingTime(PartFileInfo partFileInfo, long l) throws IOException {
return true;
}
}