Ошибка разбора Elasticsearch Spark - невозможно проанализировать значение [X] для поля [Y] - PullRequest
2 голосов
/ 11 мая 2019

Я использую Spark 2.3 (Pyspark) для чтения данных из индекса Elasticsearch 6.6.
Задание Spark пытается создать df и завершается ошибкой:

Код Spark:

df = spark.read.format("org.elasticsearch.spark.sql").option("es.resource.read", index_name).option("es.nodes", hosts).load()

Сообщение об ошибке:

org.elasticsearch.hadoop.rest.EsHadoopParsingException: Cannot parse value [2019/05/06 19:31:21] for field [GenerateTime]

Я полагаю, что это отчасти связано с отсутствием формата даты источникараспознанный формат ISO 8601 .

Также, читая документы Отображение времени / даты , я понимаю, что это можно решить путем создания сопоставления, но это повлияет только на новыеиндексы и не будут изменять отображение исторических индексов.

Вопрос:

Есть ли способ решить эту проблему, чтобы я мог успешно читать из исторических индексов черезИскра (например, перед любыми изменениями отображения, которые могут потребоваться)?Я тоже безуспешно пытался .option("es.mapping.date.rich", False).

1 Ответ

1 голос
/ 13 мая 2019

Я создал образец документа на основе ваших данных в версии ES 6.4 / Spark 2.1 и использовал приведенный ниже код, чтобы прочитать поле GenerateTime как text вместо датывведите искру.

Отображение в ES

PUT somedateindex
{
  "mappings": {
    "mydocs":{
      "properties": {
        "GenerateTime": {
          "type": "date",
          "format": "yyyy/MM/dd HH:mm:ss"
        }
      }
    }
  }
}

Обратите внимание, что поле имеет тип date в ES.

Spark Code для использования поля даты в ES в качестве строки

Обратите внимание, что я использовал опцию config ("es.mapping.date.rich" , false )

    val spark = SparkSession
                .builder()
                .appName("Spark SQL basic example")
                .config("spark.master", "local")
                .getOrCreate()

    // For implicit conversions like converting RDDs to DataFrames
    import spark.implicits._

    val df = spark.read.format("org.elasticsearch.spark.sql")
            .option("es.resource.read","somedateindex")
            .option("es.nodes", "some_host_name")
            .option("es.mapping.date.rich", false)
            .option("es.port","9200")
            .load()

   df.show()
   df.printSchema()

Результат кода искры на моей консоли Eclipse:

19/05/13 03:10:53 INFO DAGScheduler: Job 1 finished: show at Elasticsearch.scala:134, took 9.424294 s
19/05/13 03:10:53 INFO CodeGenerator: Code generated in 21.256205 ms
+-------------------+
|       GenerateTime|
+-------------------+
|2019/05/06 19:31:21|
+-------------------+

root
 |-- GenerateTime: string (nullable = true)

19/05/13 03:10:53 INFO SparkUI: Stopped Spark web UI at....

Обратите внимание, что printSchema показывает, что в таблице есть один столбец GenerateTime, которыйимеет тип string.

Если вы не хотите идти дальше и менять сопоставления, вышеприведенное должно вам помочь.

Я рекомендую иметь поля даты в формате даты, а не в тексте, и это тоже в формате, поддерживаемом ISO-8601, чтобы при выводе типа в итоге вы получали данные в правильном типе в Spark и моглипросто сосредоточьтесь на бизнес-логике, часто правильное решение заключается в том, как мы храним данные, а не в том, как мы их обрабатываем.

Spark-код для преобразования String в Timestamp / Date

Однако, если по какой-то причине вы не можете изменить сопоставления из источника, то естьasticsearch, вы можете дополнительно добавить приведенный ниже код для преобразованиястроковое значение в метку времени, используя приведенный ниже код:

    import org.apache.spark.sql.functions._

    //String into Timestamp Transformation
    val df2_timestamp = df.withColumn("GenerateTime_timestamp",  from_unixtime(unix_timestamp($"GenerateTime", "yyyy/MM/dd HH:mm:ss")).cast(TimestampType))
    df2_timestamp.show(false)
    df2_timestamp.printSchema();

Если вы запустите приведенный выше код, вы увидите вывод, показанный ниже:

19/05/14 11:33:10 INFO CodeGenerator: Code generated in 23.742359 ms
+-------------------+----------------------+
|GenerateTime       |GenerateTime_timestamp|
+-------------------+----------------------+
|2019/05/06 19:31:21|2019-05-06 19:31:21.0 |
+-------------------+----------------------+

root
 |-- GenerateTime: string (nullable = true)
 |-- GenerateTime_timestamp: timestamp (nullable = true)

19/05/14 11:33:10 INFO SparkContext: Invoking stop() from shutdown hook

Также обратите внимание, что мое решениев Скале.Дайте мне знать, если это поможет!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...