Я пытаюсь перевести задание Flink (v1.8 на EMR) с использования BucketingSink на более новую StreamingFileSink.
У меня запущен новый код, и почти все выглядит хорошо. Файлы записываются на S3 и передаются для завершения. Единственная проблема заключается в том, что ACL S3 не установлен, как это было со старым кодом.
У меня есть core-site.xml
, установленный так,
<configuration>
<property>
<name>fs.s3a.acl.default</name>
<value>BucketOwnerFullControl</value>
</property>
</configuration>
Я также использую s3a://
в качестве префикса пути в аргументе forRowFormat()
для построителя StreamingFileSink.
Кроме того, при переключении на StreamingFileSink мне пришлось добавить новую зависимость в мой build.gradle
flinkShadowJar "org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}"
Я не очень ясно, как я писал в S3 с использованием s3a : // префикс без этого фляги, когда я использовал API BucketingSink. Каким-то образом я пишу сейчас, пишу в S3 так, чтобы не уважать мой основной сайт. xml настройки.