В настоящее время мы разрабатываем Apache Beam конвейер для чтения данных из GCP Pub / Sub и записи полученных данных в корзину в AWS S3.
Мы используем TextIO.write
в beam-sdks-java-io.amazon-web-services
для записи в S3.
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards)
.withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(options.getTempLocation))
.to(FileBasedSink.convertToFileResourceIfPossible(options.getOutputDirectory))
Итак, сначала мы протестировали этот конвейер локально, используя DirectRunner
, и он работал нормально. (Данные, поступающие из Pub / Sub, были получены конвейером и записаны в S3.
options.setRunner(classOf[DirectRunner])
options.setStagingLocation("./outputFolder/staging")
options.setTempLocation("s3://my-s3-bucket/temp")
options.setOutputDirectory("s3://my-s3-bucket/output")
В последней части мы хотели запустить этот конвейер с использованием обработчика потока данных без каких-либо изменений кода, поэтому мы изменили код на используйте DataflowRunner
options.setRunner(classOf[DataflowRunner])
options.setStagingLocation("gs://my-gcs-bucket/binaries")
options.setGcpTempLocation("gs://my-gcs-bucket/temp")
options.setTempLocation("s3://my-s3-bucket/temp")
options.setOutputDirectory("s3://my-s3-bucket/output")
С этим параметром данные принимаются конвейером из pub / sub, но не записываются в S3. Также нет ошибок, записываемых в журналы потока данных в StackDriver.
Кто-нибудь знает, в чем может быть проблема? Неправильная ли конфигурация параметров конвейера? Или запись в S3 завершается с ошибкой?
У кого-нибудь есть предложения по настройке журналов в beam-sdks-java-io.amazon-web-services
вывести протокол уровня DEBUG?
Спасибо!