Flink StreamingFileSink RowFormatBuilder withBucketAssigner возвращает Any? - PullRequest
1 голос
/ 19 марта 2020

Почему эта конфигурация приводит к любому типу? Я не могу позвонить .build ()! Моя версия Flink 1.10.0 и scala версия 2.11 Ссылка на скриншот

    val sink = StreamingFileSink
      .forRowFormat(new Path("s3a://123"), csvEncoder)
      .withRollingPolicy(
        DefaultRollingPolicy.builder()
          .withRolloverInterval(TimeUnit.MINUTES.toMinutes(5))
          .withInactivityInterval(TimeUnit.MINUTES.toMinutes(5))
          .withMaxPartSize(128 * 1024 * 1024)
          .build()
      )
      .withBucketAssigner(
        new BucketAssigner[UserEvent, String] {
          override def getBucketId(element: UserEvent, context: BucketAssigner.Context): String = element.getType.name
          override def getSerializer: SimpleVersionedSerializer[String] = new SimpleVersionedStringSerializer
        }
      ) // this returns Any!!!
      .build() // can't call .build()

1 Ответ

0 голосов
/ 19 марта 2020

Проблема заключается в выводе типа Scala в сочетании с идиомой самостоятельного ввода, используемой строителями StreamingFileSink.

В качестве быстрого решения вы можете вставить приведение:

val sink = StreamingFileSink
  .forRowFormat(new Path("s3a://123"), csvEncoder)
  .withRollingPolicy(
    DefaultRollingPolicy.builder()
      .withRolloverInterval(TimeUnit.MINUTES.toMinutes(5))
      .withInactivityInterval(TimeUnit.MINUTES.toMinutes(5))
      .withMaxPartSize(128 * 1024 * 1024)
      .build()
  )
  .withBucketAssigner(
    new BucketAssigner[UserEvent, String] {
      override def getBucketId(element: UserEvent, context: BucketAssigner.Context): String = element.getType.name
      override def getSerializer: SimpleVersionedSerializer[String] = new SimpleVersionedStringSerializer
    }
  ).asInstanceOf[StreamingFileSink.RowFormatBuilder[UserEvent, String, _]]
  .build()

Правильное исправление требует изменения Flink. Вы можете отследить FLINK-16684 , чтобы получать уведомления о правильном устранении проблемы.

Обновление

Проблема была исправлена ​​с помощью Flink 1.10.1 и 1.11.0.

...