Pyspark S3A Отказано в доступе Исключение для кросс-аккаунта STS принимает на себя роль - PullRequest
0 голосов
/ 13 октября 2019

Я настроил задание AWS Glue для обработки файлов S3, присутствующих в другой учетной записи AWS B. Роль IAM в учетной записи A (роль IAM связующего задания) использует STS для принятия роли в учетной записи B, которая обеспечивает доступ к моим нужным файлам. У роли IAM учетной записи B есть отношение доверия к роли задания Glue в учетной записи A. Мне удалось распечатать ключ доступа и секретный ключ, поэтому при условии, что служба STS работает хорошо.

Я получаю сообщение об ошибке ниже:

An error occurred while calling o83.json. com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied;

Какова правильная реализация разъема S3A, когда я получаю исключение «Отказано в доступе».

Вот мой код:

from __future__ import print_function
from pyspark import SparkContext 
from pyspark import SparkConf
from pyspark import SQLContext 
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import col
from pyspark.sql.types import *
from pyspark.sql import HiveContext
from pyspark.sql.functions import explode
from pyspark.sql.functions import explode_outer
from pyspark.sql.functions import substring_index
from pyspark.sql.functions import input_file_name
from pyspark.sql import functions as f 
import sys
import os
import os
import boto3
import sys
import errno
import time
import datetime
from datetime import timedelta, date
from pyspark.sql.functions import split
from pyspark.sql.functions import substring
from boto3.session import Session

spark = SparkSession\
        .builder\
        .appName("JsonInputFormat")\
        .enableHiveSupport()\
        .getOrCreate()\

sc = spark.sparkContext

hive_context = HiveContext(sc)

hive_context.setConf("hive.exec.dynamic.partition", "true")
hive_context.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
hive_context.setConf("hive.serialization.extend.nesting.levels","true")


sqlCtx = HiveContext(sc)

client = boto3.client('sts')
response = client.assume_role(RoleArn='ROLE_TO_ASSUME', RoleSessionName='AssumeRoleSession1')
credentials = response['Credentials']


ACCESS_KEY = credentials['AccessKeyId']
SECRET_KEY = credentials['SecretAccessKey']
print('access key is {}'.format(ACCESS_KEY))
print('secret key is {}'.format(SECRET_KEY))
print("Hadoop version: " + sc._gateway.jvm.org.apache.hadoop.util.VersionInfo.getVersion())
session = Session(aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)
s3      = session.resource('s3')

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", ACCESS_KEY)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", SECRET_KEY)
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3a.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3-us-east-1.amazonaws.com")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")


def flatten_schema(schema):
    """Take schema as returned from schema().jsonValue()
    and return list of field names with full path"""
    def _flatten(schema, path="", accum=None):
        # Extract name of the current element
        name = schema.get("name")
        # If there is a name extend path
        if name is not None:
            path = "{0}.{1}".format(path, name) if path else name
            print('path is {}'.format(path))
        # It is some kind of struct
        if isinstance(schema.get("fields"), list):
            for field in schema.get("fields"):
                _flatten(field, path, accum)
        elif isinstance(schema.get("type"), dict):
            _flatten(schema.get("type"), path, accum)
        # It is an atomic type
        else:
            accum.append(path)
    accum = []
    _flatten(schema, "", accum)
    return  accum

sqlCtx.sql("set spark.sql.caseSensitive=true")

yesterday = date.today() - timedelta(1)
daybefore=yesterday.strftime("%Y-%m-%d")
currentdate=time.strftime("%Y-%m-%d")
key = 'KEY={}'.format(str(daybefore))
bucket = 'BUKCET_NAME'
df_base=spark.read.json('s3a://{}/{}/*/'.format(bucket,key))

base_schema=df_base.schema

datePrefix=str(daybefore)
source='s3a://{}/{}'.format(bucket,key)

df1=spark.read.json(source,schema=base_schema)


schema=df1.schema.jsonValue()
columns_list=flatten_schema(schema)

print('columns list is {}'.format(columns_list))


df2 = df1.select(*(col(x).alias(x.replace('.','_')) for x in columns_list))
print('df2 is {}'.format(df2))
df3=df2.select("*",explode_outer(df2.contents).alias("contents_flat"))
df3=df3.drop("contents")
print('df3 is {}'.format(df3))


schema4=df3.schema.jsonValue()
columns_list4=flatten_schema(schema4)
print('columns list 4 is {}'.format(columns_list4))
df5 = df3.select(*(col(x).alias(x.replace('.','_')) for x in columns_list4))
print('df5 is {}'.format(df5))

schema5=df5.schema.jsonValue()
columns_list5=flatten_schema(schema5)
print('columns list 5 is {}'.format(columns_list5))
df6 = df5.select(*(col(x).alias(x.replace('contents_flat','contents')) for x in columns_list5))
print('df6 is {}'.format(df6))

schema6=df6.schema.jsonValue()
columns_list6=flatten_schema(schema6)
print('column list 6 is {}'.format(columns_list6))
df7 = df6.select(*(col(x) for x in columns_list6)) #above line edited down here

schema7=df7.schema.jsonValue()
print('schema7 is {}'.format(schema7))
columns_list7=flatten_schema(schema7)
print('columns list 7 is {}'.format(columns_list7))
df7 = df7.select(*(col(x).alias(x.replace('.','_')) for x in columns_list7))
df7=df7.select("*",explode_outer(df7.business_address_latLng_warnings).alias("business_address_latLng_warnings_flat"))
df7=df7.drop("business_address_latLng_warnings")

print('df7 is {}'.format(df7))

df8 = df7.withColumn("filename",input_file_name())
split_col = split(df8['filename'], 'short_date=')
df9 = df8.withColumn('shortfilename', split_col.getItem(1))
df_final = df9.withColumn('filedate', substring('shortfilename',1,10)).drop('shortfilename')
print('df_final is {}'.format(df_final))

df_final.write.mode('append').csv('s3://{bucket}/{folder1}/', header='true')

spark.stop()```
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...