Flink StreamingFileSink не попадает на S3, когда контрольная точка отключена - PullRequest
0 голосов
/ 27 мая 2019

Я хочу использовать aws s3 в качестве приемника для потока данных в flink.Я использую класс StreamingFileSink для создания приемника.

Мне не нужны контрольные точки для моей работы, но когда я отключаю контрольные точки, данные больше не записываются в S3.

case 1: контрольная точка включена
Когда контрольная точка включена,данные успешно поступают на указанный путь s3.

вариант 2: контрольная точка отключена
Когда контрольная точка отключена, данные не записываются в s3.
Я пытался выполнить задание несколько раз, но каждый раз получал один и тот же результат.Я сталкиваюсь с этим на локальной машине, а также на кластере kubernetes.

object FlinkTestJob {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // with checkpointing enabled
    env.enableCheckpointing(100)

    // Sinks
    val streamStrings: Seq[String] =
      Seq("test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8", "test9", "test10")

    val testStream = env.fromCollection(streamStrings)

    val rollingPolicy = new RollingPolicy[String, String] {

      override def shouldRollOnCheckpoint(partFileState: PartFileInfo[String]): Boolean =
        partFileState.getSize > 1

      override def shouldRollOnEvent(
          partFileState: PartFileInfo[String],
          element: String): Boolean = true

      override def shouldRollOnProcessingTime(
          partFileState: PartFileInfo[String],
          currentTime: Long): Boolean = true
    }

    val sink: StreamingFileSink[String] = StreamingFileSink
      .forRowFormat(new Path("s3a://testbucket/sink"), new SimpleStringEncoder[String]("UTF-8"))
      .withRollingPolicy(rollingPolicy)
      .build()

    testStream.addSink(sink)
    env.execute("test-job")
  }
}

Когда я пишу в s3, используя «writeAsText (« s3a: // testbucket / sink »)» вместо StreamingFileSink, он отлично работает независимо от того, включена ли контрольная точка.

Версия Flink: 1.8.0
Я хочу понять связь между контрольными точками и StreamingFileSink.
Спасибо

...