Ошибка при выполнении методов S3Hook list_keys или read_key - PullRequest
0 голосов
/ 22 марта 2020

Я получаю это сообщение об ошибке: {logging_mixin.py:112} INFO - [2020-03-22 12:34:53,672] {local_task_job.py:103} INFO - Task exited with return code -6 , когда я использую list_keys или read_key методы перехвата S3. Хотя метод get_credentials работает нормально. Искал и не могу найти, почему это происходит.

Я использую apache-airflow==1.10.9, boto3==1.12.21, botocore==1.15.21

Вот мой код для моего пользовательского оператора, который использует Крюк S3:

class SASValueToRedshiftOperator(BaseOperator):
    """Custom Operator for extracting data from SAS source code.
    Attributes:
        ui_color (str): color code for task in Airflow UI.
    """
    ui_color = '#358150'

    @apply_defaults
    def __init__(self,
                 aws_credentials_id="",
                 redshift_conn_id="",
                 table="",
                 s3_bucket="",
                 s3_key="",
                 sas_value="",
                 columns="",
                 *args, **kwargs):
        """Extracts label mappings from SAS source code and store as Redshift table
        Args:
            aws_credentials_id (str): Airflow connection ID for AWS key and secret.
            redshift_conn_id (str): Airflow connection ID for redshift database.
            table (str): Name of table to load data to.
            s3_bucket (str): S3 Bucket Name Where SAS source code is store.
            s3_key (str): S3 Key Name for SAS source code.
            sas_value (str): value to search for in sas file for extraction of data.
            columns (list): resulting data column names.
        Returns:
            None
        """
        super(SASValueToRedshiftOperator, self).__init__(*args, **kwargs)
        self.aws_credentials_id = aws_credentials_id
        self.redshift_conn_id = redshift_conn_id
        self.table = table
        self.s3_bucket = s3_bucket
        self.s3_key = s3_key
        self.sas_value = sas_value
        self.columns = columns

    def execute(self, context):
        """Executes task for staging to redshift.
        Args:
            context (:obj:`dict`): Dict with values to apply on content.
        Returns:
            None   
        """
        s3 = S3Hook(self.aws_credentials_id)
        redshift_conn = BaseHook.get_connection(self.redshift_conn_id)

        self.log.info(s3)
        self.log.info(s3.get_credentials())
        self.log.info(s3.list_keys(self.s3_bucket))
...