Я хочу использовать aws s3 в качестве приемника для потока данных в flink.Я использую класс StreamingFileSink для создания приемника.
Мне не нужны контрольные точки для моей работы, но когда я отключаю контрольные точки, данные больше не записываются в S3.
case 1: контрольная точка включена
Когда контрольная точка включена,данные успешно поступают на указанный путь s3.
вариант 2: контрольная точка отключена
Когда контрольная точка отключена, данные не записываются в s3.
Я пытался выполнить задание несколько раз, но каждый раз получал один и тот же результат.Я сталкиваюсь с этим на локальной машине, а также на кластере kubernetes.
object FlinkTestJob {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// with checkpointing enabled
env.enableCheckpointing(100)
// Sinks
val streamStrings: Seq[String] =
Seq("test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10")
val testStream = env.fromCollection(streamStrings)
val rollingPolicy = new RollingPolicy[String, String] {
override def shouldRollOnCheckpoint(partFileState: PartFileInfo[String]): Boolean =
partFileState.getSize > 1
override def shouldRollOnEvent(
partFileState: PartFileInfo[String],
element: String): Boolean = true
override def shouldRollOnProcessingTime(
partFileState: PartFileInfo[String],
currentTime: Long): Boolean = true
}
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path("s3a://testbucket/sink"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(rollingPolicy)
.build()
testStream.addSink(sink)
env.execute("test-job")
}
}
Когда я пишу в s3, используя «writeAsText (« s3a: // testbucket / sink »)» вместо StreamingFileSink, он отлично работает независимо от того, включена ли контрольная точка.
Версия Flink: 1.8.0
Я хочу понять связь между контрольными точками и StreamingFileSink.
Спасибо