Я получаю это сообщение об ошибке: {logging_mixin.py:112} INFO - [2020-03-22 12:34:53,672] {local_task_job.py:103} INFO - Task exited with return code -6
, когда я использую list_keys
или read_key
методы перехвата S3. Хотя метод get_credentials
работает нормально. Искал и не могу найти, почему это происходит.
Я использую apache-airflow==1.10.9
, boto3==1.12.21
, botocore==1.15.21
Вот мой код для моего пользовательского оператора, который использует Крюк S3:
class SASValueToRedshiftOperator(BaseOperator):
"""Custom Operator for extracting data from SAS source code.
Attributes:
ui_color (str): color code for task in Airflow UI.
"""
ui_color = '#358150'
@apply_defaults
def __init__(self,
aws_credentials_id="",
redshift_conn_id="",
table="",
s3_bucket="",
s3_key="",
sas_value="",
columns="",
*args, **kwargs):
"""Extracts label mappings from SAS source code and store as Redshift table
Args:
aws_credentials_id (str): Airflow connection ID for AWS key and secret.
redshift_conn_id (str): Airflow connection ID for redshift database.
table (str): Name of table to load data to.
s3_bucket (str): S3 Bucket Name Where SAS source code is store.
s3_key (str): S3 Key Name for SAS source code.
sas_value (str): value to search for in sas file for extraction of data.
columns (list): resulting data column names.
Returns:
None
"""
super(SASValueToRedshiftOperator, self).__init__(*args, **kwargs)
self.aws_credentials_id = aws_credentials_id
self.redshift_conn_id = redshift_conn_id
self.table = table
self.s3_bucket = s3_bucket
self.s3_key = s3_key
self.sas_value = sas_value
self.columns = columns
def execute(self, context):
"""Executes task for staging to redshift.
Args:
context (:obj:`dict`): Dict with values to apply on content.
Returns:
None
"""
s3 = S3Hook(self.aws_credentials_id)
redshift_conn = BaseHook.get_connection(self.redshift_conn_id)
self.log.info(s3)
self.log.info(s3.get_credentials())
self.log.info(s3.list_keys(self.s3_bucket))