У меня есть конвейер, использующий лямбду и EMR, где я читаю csv из одной учетной записи S3 и записываю паркет в другую s3 в учетной записи B. Я создал EMR в учетной записи B и имею доступ к s3 в учетной записи B. Я не могу добавить учетную запись A Доступ к корзине s3 в EMR_EC2_DefaultRole (так как эта учетная запись является хранилищем данных предприятия), поэтому я использую accessKey, секретный ключ для доступа к учетной записи A корзина s3. Это делается с помощью токена congnito.
METHOD1
Я использую протокол fs.s3 для чтения csv из s3 из учетной записи A и записи в s3 из учетной записи B. У меня есть код pyspark, который читает из s3 (A) и пишет в паркет s3 (B), который я отправляю задание 100 заданий одновременно. Этот код pyspark выполняется в EMR.
Чтение с использованием следующих настроек
hadoop_config = sc._jsc.hadoopConfiguration()
hadoop_config.set("fs.s3.awsAccessKeyId", dl_access_key)
hadoop_config.set("fs.s3.awsSecretAccessKey", dl_secret_key)
hadoop_config.set("fs.s3.awsSessionToken", dl_session_key)
spark_df_csv = spark_session.read.option("Header", "True").csv("s3://somepath")
Запись:
Я использую протокол s3a s3a://some_bucket/
Это работает, но иногда я вижу
- _временная папка присутствует в корзине s3 и не все CSV преобразуются в паркет
- Когда я включаю параллелизм EMR в 256 (EMR-5.28 ) и отправьте 10 0 заданий это я получаю _временная ошибка переименования.
Проблемы:
- Этот метод создает временную папку, а иногда и не удаляет ее. Я могу видеть _teвременную папку в s3 bucket.
- , когда я включаю EMR-параллелизм (последняя версия EMR5.28), она позволяет выполнять шаги параллельно, я получаю переименование _teorary error для некоторых файлов.
МЕТОД 2:
Мне кажется, что s3a не подходит для параллельной работы. Поэтому я хочу читать и писать, используя fs.s3, так как он имеет лучший файл commiters .
Так что я сделал это изначально, я установил конфигурацию oop, как указано выше, для учетной записи A, а затем отключил конфигурации, так что он может получить доступ к учетной записи по умолчанию B в конце концов S3 Bucket. Таким образом
hadoop_config = sc._jsc.hadoopConfiguration()
hadoop_config.unset("fs.s3.awsAccessKeyId")
hadoop_config.unset("fs.s3.awsSecretAccessKey")
hadoop_config.unset("fs.s3.awsSessionToken")
spark_df_csv.repartition(1).write.partitionBy(['org_id', 'institution_id']). \
mode('append').parquet(write_path)
Проблемы :
Это работает, но проблема, скажем, если я запускаю лямбду, которая, в свою очередь, отправляет задание для 100 файлов (в l oop) около 10 нечетных файлов приводят к отказу в доступе при записи файла в корзину s3.
java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) \ n ... еще 1 \ nПричинено: com.amazon.ws.emr. имел oop .fs.shaded.com.amazon aws .services.s3.model.AmazonS3Exception: доступ запрещен (Служба:
Это может быть из-за того, что эта неустановка иногда не работает или из-за параллельного запуска сброса контекста / набора сеансов Spark, происходящего при распараллеливании? Я имею в виду, что спарк-контекст для одного задания сбрасывает конфигурацию oop, а другой задает ее, что может вызвать эту проблему, хотя и не уверен, как контекст спарка работает в параллельно.
Разве у каждой работы нет отдельного контекста и сеанса Spark. Пожалуйста, предложите альтернативы для моей ситуации.