Есть ли способ использовать динамическое индексирование через "es.index.write = {index} / type" в методе writeStream при потоковой передаче с искрой? - PullRequest
0 голосов
/ 28 марта 2019

Я использую структурированную потоковую передачу Spark для рекурсивного чтения многих файлов и их записи вasticsearch. Я хочу знать, если есть способ создать индекс во время выполнения в соответствии с именем файла искры чтения из папки. Предположим, что Spark прочитал файл из директории client1, поэтому он должен индексировать его вasticsearch как client1.

Я прочитал эту статью , они сделали это довольно легко, но проблема в том, что я использую структурированную потоковую передачу Spark, в которой нет метода SavetoEs вместо метода WriteStream.

То, что я хочу сделать, это рекурсивное чтение файлов из разных каталогов, и когда я запишу их вasticsearch, я хочу проиндексировать их вasticsearch как, например, file:/home/usr/client/message/single, поэтому он должен быть проиндексирован как "client-message-single" в asticsearh на основе пути к файлу.

file_path= file://home/usr/client/email/message/singlemesage
file_split_path= file_path.split('/')
index = str(file_split_path[-4] + '-' + file_split_path[-3] + '-' + file_split_path[-2])
myjson['doc-index'] =index       //adding key value in my json 
df=myjson.writeStream.option("es.resource.write","{doc-index}/default").outputMode(""append").format("org.elasticsearch.spark.sql").start()
df.awaitTermination()

Но, используя описанную выше технику, я получаю следующую ошибку:

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: не удается найти соответствует для {doc-index} / default в org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry (BulkEntryWriter.java:136) в org.elasticsearch.hadoop.rest.RestRepository.writeToIndex (RestRepository.java:170) в org.elasticsearch.spark.rdd.EsRDDWriter.write (EsRDDWriter.scala: 74) в org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run (EsStreamQueryWriter.scala: 41) в org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink $$ anonfun $ addBatch $ 2 $$ anonfun $ 2.Apply (EsSparkSqlStreamingSink.scala: 52) в org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink $$ anonfun $ addBatch $ 2 $$ anonfun $ 2.Apply (EsSparkSqlStreamingSink.scala: 51)

в json есть "doc-index": "client-message-singlemessage", который читать потоковой струйной искрой.

...