Я пытаюсь передать журналы guardduty из s3 в ElasticSearch. Guardduty помещает журналы в формат .jsonl.gz
Вот код:
from __future__ import print_function
import boto3
import json
import datetime
import gzip
import urllib
import urllib3
import logging
from pprint import pprint
from requests_aws4auth import AWS4Auth
import requests
from io import BytesIO
"""
Can Override the global variables using Lambda Environment Parameters
"""
globalVars = {}
globalVars['Owner'] = "*****"
globalVars['Environment'] = "test"
globalVars['awsRegion'] = "us-east-1"
globalVars['tagName'] = "serverless-s3-to-es-log-ingester"
globalVars['service'] = "es"
globalVars['esIndexPrefix'] = "guarddutylogs-"
globalVars['esIndexDocType'] = "gdutylogsdocs"
globalVars['esHosts'] = {
'test': 'https://********************.us-east-1.es.amazonaws.com' ,
'prod': ''
}
# Initialize Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def indexDocElement(es_Url, awsauth, docData):
try:
headers = { "Content-Type": "application/json" }
#headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
resp = requests.post(es_Url, auth=awsauth, headers=headers, json=docData)
if resp.status_code == 201:
logger.info('INFO: Successfully inserted element into ES')
else:
logger.error('FAILURE: Unable to index element')
except Exception as e:
logger.error('ERROR: {0}'.format( str(e) ) )
logger.error('ERROR: Unable to index line:"{0}"'.format( str( docData['content'] ) ) )
print (e)
def lambda_handler(event, context):
s3 = boto3.client('s3')
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth( credentials.access_key,
credentials.secret_key,
globalVars['awsRegion'],
globalVars['service'],
session_token=credentials.token
)
logger.info("Received event: " + json.dumps(event, indent=2))
try:
bucket = event['Records'][0]['s3']['bucket']['name']
print('Bucket:' + bucket)
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
print('Key:' + str(key))
# Get documet (obj) form S3
obj = s3.get_object(Bucket=bucket, Key=key)
except Exception as e:
logger.error('ERROR: {0}'.format( str(e) ) )
logger.error('ERROR: Unable able to GET object:{0} from S3 Bucket:{1}. Verify object exists.'.format(key, bucket) )
if (key.endswith('.gz')) or (key.endswith('.tar.gz')):
mycontentzip = gzip.GzipFile(fileobj=BytesIO(obj['Body'].read())).read()
lines = mycontentzip.decode("utf-8").replace("'", '"')
print('unziped file')
else:
lines = obj['Body'].read().decode("utf-8").replace("'", '"')
logger.info('SUCCESS: Retreived object from S3')
# Split (S3 object/Log File) by lines
lines = lines.splitlines()
if (isinstance(lines, str)):
lines = [lines]
# Index each line to ES Domain
date = datetime.datetime.now().strftime("%Y.%m.%d")
indexName = globalVars['esIndexPrefix'] + date
es_Url = globalVars['esHosts'].get('test') + '/' + indexName + '/' + globalVars['esIndexDocType']
docData = {}
docData['objectKey'] = str(key)
docData['createdDate'] = str(obj['LastModified'])
docData['content_type'] = str(obj['ContentType'])
docData['content_length'] = str(obj['ContentLength'])
for line in lines:
docData['content'] = str(line)
docData['content'] = json.loads(docData['content'] )
indexDocElement(es_Url, awsauth, docData )
logger.info('SUCCESS: Successfully indexed the entire doc into ES')
if __name__ == '__main__':
lambda_handler(None, None)
Вот мой вывод:
ERROR: An error occurred (AccessDenied) when calling the GetObject operation: Access Denied
ERROR: Unable able to GET object:AWSLogs/<Account ID>/GuardDuty/eu-west-2/2020/02/27/b60d8505-5d06-3c75-b4f6-e62cbe01094b.jsonl.gz
Лямбда-политика IAM имеет: S3 policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::<Bucket name>/*",
"arn:aws:s3:::<Bucket name>"
]
}
]
}
Лямбда-роль выполнения:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
Тогда Elasticsearch и VPCFull access.
Политика S3 Bucket:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Allow GuardDuty to use the getBucketLocation operation",
"Effect": "Allow",
"Principal": {
"Service": "guardduty.amazonaws.com"
},
"Action": "s3:GetBucketLocation",
"Resource": "<Given s3 bucket ARN>/*"
},
{
"Sid": "Allow GuardDuty to upload objects to the bucket",
"Effect": "Allow",
"Principal": {
"Service": "guardduty.amazonaws.com"
},
"Action": "s3:PutObject",
"Resource": "<Given s3 bucket ARN>/*"
},
{
"Sid": "Statement1",
"Effect": "Allow",
"Principal": {
**"AWS": "<Given Lambda ARN>/*"** --> *added this.Getting invalid policy as result*
},
"Action": "s3:*",
"Resource": "<Given s3 bucket ARN>/*"
}
]
}
Пожалуйста, помогите мне с этой ошибкой, пробовал много способов, но не уверен, в чем проблема