Я конвертирую файлы .avro в формат JSON, затем анализирую конкретные элементы данных, которые будут проиндексированы в моем кластере упругого поиска. Каждый блок содержит примерно 1,8 гигабайта данных и около 500 блоков. Недостаточно памяти не займет много времени, но я подумал, что это именно то, для чего нужны основные помощники в библиотеке эластичного поиска.
Мне не хватает некоторых ключевых деталей здесь?
def connect_elasticsearch(address):
es = Elasticsearch([address], verify_certs=False)
if not es.ping():
raise ValueError("Connection failed")
return es
#function for running unix command line from python
def run_cmd(args_list):
print(str(datetime.datetime.now())[:19]+': Running system command: {0}'.format(' '.join(args_list)))
proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
s_output, s_err = proc.communicate()
s_return = proc.returncode
return s_return, s_output, s_err
def gendata(path):
cat = subprocess.Popen(["hdfs", "dfs", "-cat", path], stdout=subprocess.PIPE)
avro_reader = reader(cat.stdout)
for record in avro_reader:
yield {
'_index': 'beta_homepage_survey_2',
"hostname": record['hostname'],
"age": record['ts'],
"text": record['text'],
"metadata": record['metadata'],
"source":path}
es = connect_elasticsearch('http://myurl:9200/')
#find all avro files for the homepage survey via the command line (hdfs commands)
(ret, out, err)= run_cmd(['hdfs', 'dfs', '-ls', '/homepage_survey/chunks/*/*.avro'])
lines = out.split('\n')
for line in lines:
try:
line = str('/' + line.split(" /")[1])
print(str(datetime.datetime.now())[:19]+": Indexing File: "+ line)
helpers.bulk(es,gendata(line))
except Exception as ex:
print(str(datetime.datetime.now())[:19] + ": *** Error indexing chunk:- "+ type(ex).__name__)
continue
print(str(datetime.datetime.now())[:19]+ ": Indexing Complete...")