Проблема заключается в выводе типа Scala в сочетании с идиомой самостоятельного ввода, используемой строителями StreamingFileSink
.
В качестве быстрого решения вы можете вставить приведение:
val sink = StreamingFileSink
.forRowFormat(new Path("s3a://123"), csvEncoder)
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMinutes(5))
.withInactivityInterval(TimeUnit.MINUTES.toMinutes(5))
.withMaxPartSize(128 * 1024 * 1024)
.build()
)
.withBucketAssigner(
new BucketAssigner[UserEvent, String] {
override def getBucketId(element: UserEvent, context: BucketAssigner.Context): String = element.getType.name
override def getSerializer: SimpleVersionedSerializer[String] = new SimpleVersionedStringSerializer
}
).asInstanceOf[StreamingFileSink.RowFormatBuilder[UserEvent, String, _]]
.build()
Правильное исправление требует изменения Flink. Вы можете отследить FLINK-16684 , чтобы получать уведомления о правильном устранении проблемы.
Обновление
Проблема была исправлена с помощью Flink 1.10.1 и 1.11.0.