Я использую структурированную потоковую передачу Spark для рекурсивного чтения многих файлов и их записи вasticsearch. Я хочу знать, если есть способ создать индекс во время выполнения в соответствии с именем файла искры чтения из папки. Предположим, что Spark прочитал файл из директории client1, поэтому он должен индексировать его вasticsearch как client1.
Я прочитал эту статью , они сделали это довольно легко, но проблема в том, что я использую структурированную потоковую передачу Spark, в которой нет метода SavetoEs
вместо метода WriteStream
.
То, что я хочу сделать, это рекурсивное чтение файлов из разных каталогов, и когда я запишу их вasticsearch, я хочу проиндексировать их вasticsearch как, например, file:/home/usr/client/message/single
, поэтому он должен быть проиндексирован как "client-message-single" в asticsearh на основе пути к файлу.
file_path= file://home/usr/client/email/message/singlemesage
file_split_path= file_path.split('/')
index = str(file_split_path[-4] + '-' + file_split_path[-3] + '-' + file_split_path[-2])
myjson['doc-index'] =index //adding key value in my json
df=myjson.writeStream.option("es.resource.write","{doc-index}/default").outputMode(""append").format("org.elasticsearch.spark.sql").start()
df.awaitTermination()
Но, используя описанную выше технику, я получаю следующую ошибку:
org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: не удается найти
соответствует для {doc-index} / default в
org.elasticsearch.hadoop.serialization.bulk.BulkEntryWriter.writeBulkEntry (BulkEntryWriter.java:136)
в
org.elasticsearch.hadoop.rest.RestRepository.writeToIndex (RestRepository.java:170)
в
org.elasticsearch.spark.rdd.EsRDDWriter.write (EsRDDWriter.scala: 74)
в
org.elasticsearch.spark.sql.streaming.EsStreamQueryWriter.run (EsStreamQueryWriter.scala: 41)
в
org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink $$ anonfun $ addBatch $ 2 $$ anonfun $ 2.Apply (EsSparkSqlStreamingSink.scala: 52)
в
org.elasticsearch.spark.sql.streaming.EsSparkSqlStreamingSink $$ anonfun $ addBatch $ 2 $$ anonfun $ 2.Apply (EsSparkSqlStreamingSink.scala: 51)
в json есть "doc-index": "client-message-singlemessage", который
читать потоковой струйной искрой.