У меня есть довольно простая задача индексирования, которая считывает из файла разделенный строкой json, выполняет простые манипуляции с входящими данными, а затем индексирует с использованием эластичного поиска parallel_bulk
. Я использую Python3, ES 6,7 на сервере (2 узла, 16G каждый).
Когда я запускаю приведенный ниже код, я вижу довольно постоянно увеличивающееся использование памяти до тех пор, пока машина не начнет меняться. Данные невелики (<500 символов в строке), поэтому меня удивляет использование памяти. </p>
Я провел небольшую настройку размера очереди, количества потоков и размера чанка, и чувствительность довольно большая. Я хотел бы оптимизировать скорость индексации на клиенте (я выполнил работу на стороне сервера, чтобы ускорить процесс), сохранив при этом достаточную память.
Код:
def yield_records(idx='sra_bulk_test_biosample'):
for fname in list(filter(lambda f: f.endswith('.json.gz'),os.listdir())):
print(fname)
with gzip.open(fname, 'r') as f:
n = 0
for line in f:
n+=1
if(n % 10000 == 0):
print(n)
res = json.loads(line)
yield {
"_op_type": "index", # this is the default
"_index": idx,
"_type": 'doc', # no more doc types after es_6
"_id": res['accession'], # extract _id from record
"_source": res # use entire record as "source"
}
def main():
for success, info in parallel_bulk(es, actions = yield_records(), chunk_size=1000, queue_size=4, thread_count=4, yield_ok = False):
if(not success):
print(info)
if __name__ == '__main__':
main()