Конфигурация fs.s3 с двумя учетными записями s3 с EMR - PullRequest
0 голосов
/ 09 января 2020

У меня есть конвейер, использующий лямбду и EMR, где я читаю csv из одной учетной записи S3 и записываю паркет в другую s3 в учетной записи B. Я создал EMR в учетной записи B и имею доступ к s3 в учетной записи B. Я не могу добавить учетную запись A Доступ к корзине s3 в EMR_EC2_DefaultRole (так как эта учетная запись является хранилищем данных предприятия), поэтому я использую accessKey, секретный ключ для доступа к учетной записи A корзина s3. Это делается с помощью токена congnito.

METHOD1

Я использую протокол fs.s3 для чтения csv из s3 из учетной записи A и записи в s3 из учетной записи B. У меня есть код pyspark, который читает из s3 (A) и пишет в паркет s3 (B), который я отправляю задание 100 заданий одновременно. Этот код pyspark выполняется в EMR.

Чтение с использованием следующих настроек

hadoop_config = sc._jsc.hadoopConfiguration()
hadoop_config.set("fs.s3.awsAccessKeyId", dl_access_key)
hadoop_config.set("fs.s3.awsSecretAccessKey", dl_secret_key)
hadoop_config.set("fs.s3.awsSessionToken", dl_session_key)

spark_df_csv = spark_session.read.option("Header", "True").csv("s3://somepath")

Запись:

Я использую протокол s3a s3a://some_bucket/

Это работает, но иногда я вижу

  1. _временная папка присутствует в корзине s3 и не все CSV преобразуются в паркет
  2. Когда я включаю параллелизм EMR в 256 (EMR-5.28 ) и отправьте 10 0 заданий это я получаю _временная ошибка переименования.

Проблемы:

  1. Этот метод создает временную папку, а иногда и не удаляет ее. Я могу видеть _teвременную папку в s3 bucket.
  2. , когда я включаю EMR-параллелизм (последняя версия EMR5.28), она позволяет выполнять шаги параллельно, я получаю переименование _teorary error для некоторых файлов.

МЕТОД 2:

Мне кажется, что s3a не подходит для параллельной работы. Поэтому я хочу читать и писать, используя fs.s3, так как он имеет лучший файл commiters .

Так что я сделал это изначально, я установил конфигурацию oop, как указано выше, для учетной записи A, а затем отключил конфигурации, так что он может получить доступ к учетной записи по умолчанию B в конце концов S3 Bucket. Таким образом

hadoop_config = sc._jsc.hadoopConfiguration()
hadoop_config.unset("fs.s3.awsAccessKeyId")
hadoop_config.unset("fs.s3.awsSecretAccessKey")
hadoop_config.unset("fs.s3.awsSessionToken")


spark_df_csv.repartition(1).write.partitionBy(['org_id', 'institution_id']). \
    mode('append').parquet(write_path)

Проблемы :

Это работает, но проблема, скажем, если я запускаю лямбду, которая, в свою очередь, отправляет задание для 100 файлов (в l oop) около 10 нечетных файлов приводят к отказу в доступе при записи файла в корзину s3.

java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) \ n ... еще 1 \ nПричинено: com.amazon.ws.emr. имел oop .fs.shaded.com.amazon aws .services.s3.model.AmazonS3Exception: доступ запрещен (Служба:

Это может быть из-за того, что эта неустановка иногда не работает или из-за параллельного запуска сброса контекста / набора сеансов Spark, происходящего при распараллеливании? Я имею в виду, что спарк-контекст для одного задания сбрасывает конфигурацию oop, а другой задает ее, что может вызвать эту проблему, хотя и не уверен, как контекст спарка работает в параллельно.

Разве у каждой работы нет отдельного контекста и сеанса Spark. Пожалуйста, предложите альтернативы для моей ситуации.

...