Почему у меня заканчивается память при добавлении массовых документов в упругий поиск с помощью массовых помощников? - PullRequest
0 голосов
/ 18 октября 2019

Я конвертирую файлы .avro в формат JSON, затем анализирую конкретные элементы данных, которые будут проиндексированы в моем кластере упругого поиска. Каждый блок содержит примерно 1,8 гигабайта данных и около 500 блоков. Недостаточно памяти не займет много времени, но я подумал, что это именно то, для чего нужны основные помощники в библиотеке эластичного поиска.

Мне не хватает некоторых ключевых деталей здесь?

def connect_elasticsearch(address):
    es = Elasticsearch([address], verify_certs=False)
    if not es.ping():
        raise ValueError("Connection failed")
    return es

#function for running unix command line from python
def run_cmd(args_list):
        print(str(datetime.datetime.now())[:19]+': Running system command: {0}'.format(' '.join(args_list)))
        proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        s_output, s_err = proc.communicate()
        s_return =  proc.returncode
        return s_return, s_output, s_err
def gendata(path):
  cat = subprocess.Popen(["hdfs", "dfs", "-cat", path], stdout=subprocess.PIPE)
  avro_reader = reader(cat.stdout)
  for record in avro_reader:
    yield {
      '_index': 'beta_homepage_survey_2',
      "hostname": record['hostname'],
      "age": record['ts'],
      "text": record['text'],
      "metadata": record['metadata'],
      "source":path}
es = connect_elasticsearch('http://myurl:9200/')
#find all avro files for the homepage survey via the command line (hdfs commands)
(ret, out, err)= run_cmd(['hdfs', 'dfs', '-ls', '/homepage_survey/chunks/*/*.avro'])
lines = out.split('\n')
for line in lines:
  try:
    line = str('/' + line.split(" /")[1])
    print(str(datetime.datetime.now())[:19]+": Indexing File: "+ line)
    helpers.bulk(es,gendata(line))
  except Exception as ex:
    print(str(datetime.datetime.now())[:19] + ": *** Error indexing chunk:- "+ type(ex).__name__)
    continue
print(str(datetime.datetime.now())[:19]+ ": Indexing Complete...")
...