Я пытаюсь использовать потребительский источник Kafka с приемником потокового файла до S3
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(“s3a://test/test”), new SimpleStringEncoder("UTF-8"))
.withBucketAssigner(new DateTimeBucketAssigner(outputBucketFormat))
.withRollingPolicy(rollingPolicy)
.build();
У меня есть added s3a.access.key
и s3a.secret.key
до fink-conf.yaml
и скопированы ./opt/flink-s3-fs-hadoop-1.9.0.jar
в ./plugins/s3-fs-hadoop/
как упоминается в документации
Но это не работает, не генерирует никаких исключений в журналах, вместо этого обработка просто зависает. Если я использую приемник FileSystem, он работает нормально.
Обработка застревает в следующих журналах:
2020-01-06 23:51:47,237 INFO org.apache.flink.fs.shaded.hadoop3.org.apache.commons.beanutils.FluentPropertyBeanIntrospector -
Error when creating PropertyDescriptor for public final void org.apache.flink.fs.shaded.hadoop3.org.apache.commons.configuration2.AbstractConfiguration.setProperty(java.lang.String,java.lang.Object)! Ignoring this property.
2020-01-06 23:51:47,249 WARN org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsConfig -
Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
2020-01-06 23:51:47,275 INFO org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl -
Scheduled Metric snapshot period at 10 second(s).
2020-01-06 23:51:47,276 INFO org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.metrics2.impl.MetricsSystemImpl -
s3a-file-system metrics system started
Требуется ли какая-либо дополнительная конфигурация для S3? Протестировано с обеими версиями Flink: 1.9.0 и 1.9.1.
Найдено https://issues.apache.org/jira/browse/FLINK-14574, но даже после установки jar flink-s3-fs-had oop под lib не работает , Пожалуйста, дайте мне знать, если есть какая-либо рабочая конфигурация с любым из изображений флинка, доступных на docker -hub