Я загружаю данные из aws s3, затем распаковываю, форматирую данные и пытаюсь отправить этот разархивированный отформатированный файл в awsasticsearch.
Для каждого файла он создает его на моем локальном компьютере как «access.log», удаляет его после распаковки, форматирования, отправки в awsasticsearch, а затем повторяет процесс.
Однако, когда я пытаюсьчтобы сделать это с тем же именем, это говорит мне, что я не могу удалить при работе с файлом, я не хочу создавать несколько файлов. Какой самый эффективный способ сделать это?
ПРИМЕЧАНИЕ: acc_logs - это список многих файлов журнала. Например: acc_log_file.gz
вот мой код:
for log_file in acc_logs: # one file
bucket.download_file(log_file, 'C:\\Users\\name\\Desktop\\s3-to-es\\access.log')
with gzip.open('C:\\Users\\name\\Desktop\\s3-to-es\\access.log') as log_file:
for line in log_file:
line = line.decode("utf-8") # decode byte to str
try:
ip = ip_pattern.search(line).group(0)
except:
ip = None
try:
host = host_pattern.search(line).group(0)[1:-1]
except:
host = None
date = time_pattern.search(line).group(0)[1:-1]
pos = [x.end() for x in re.finditer('"', line)]
request = line[pos[0]:pos[1]-1]
referr_url = line[pos[2]:pos[3]-1]
user_agent = line[pos[4]:pos[5]-1]
status, bytes = line[pos[1]+1:pos[2]-2].split()
ident, authuser = authuser_pattern.search(line).group(0).split()
document = {"server_ip": ip,
"host":host,
"ident":ident,
"authuser":authuser,
"date": date,
"request":request,
"status":status,
"bytes":bytes,
"referr_url":referr_url,
"user_agent":user_agent
}
ToElasticSearch(document) #upload each line to es
if count == 10:
break
os.remove('C:\\Users\\name\\Desktop\\s3-to-es\\access.log')
ToElasticSearch выглядит так:
def ToElasticSearch(document):
"""
upload given document to specified index in AWS
"""
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
es = Elasticsearch(
hosts = [{'host': es_host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)
es.index(index="acc_logs", doc_type="_doc", body=document)