Я использую Spark в Glue для записи в AWS / ElasticSearch со следующей конфигурацией для Spark:
conf.set("es.nodes", s"$nodes/$indexName")
conf.set("es.port", "443")
conf.set("es.batch.write.retry.count", "200")
conf.set("es.batch.size.bytes", "512kb")
conf.set("es.batch.size.entries", "500")
conf.set("es.index.auto.create", "false")
conf.set("es.nodes.wan.only", "true")
conf.set("es.net.ssl", "true")
, однако я получаю следующую ошибку:
diagnostics: User class threw exception: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot detect ES version - typically this happens if the network/Elasticsearch cluster is not accessible or when targeting a WAN/Cloud instance without the proper setting 'es.nodes.wan.only'
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverClusterInfo(InitializationUtils.java:340)
at org.elasticsearch.spark.rdd.EsSpark$.doSaveToEs(EsSpark.scala:104)
....
Я знаю, в каком "VP C" запущен мой экземпляр ElasticSearch, но я не уверен, как установить его для Glue / Spark или если это другая проблема. Любая идея?
Я также пытался добавить соединение "glue jdb c", которое должно использовать правильное соединение VP C, но я не уверен, как правильно его настроить:
import scala.reflect.runtime.universe._
def saveToEs[T <: Product : TypeTag](index: String, data: RDD[T]) =
SparkProvider.glueContext.getJDBCSink(
catalogConnection = "my-elasticsearch-connection",
options = JsonOptions(
"WHAT HERE?"
),
transformationContext = "SinkToElasticSearch"
).writeDynamicFrame(DynamicFrame(
SparkProvider.sqlContext.createDataFrame[T](data),
SparkProvider.glueContext))