Как избежать »- зарезервированное ключевое слово, которое нельзя использовать в качестве имени поля« ошибка в Spark SQL и структурированной потоковой передаче ». - PullRequest
0 голосов
/ 29 марта 2019

В настоящее время, когда я использовал Structured Streaming v2.1.0 + Kafka v0.10 для обработки журналов в реальном времени, я получил исключение в потоке "main" java.lang.UnsupportedOperationException: `package` является зарезервированным ключевым словом и не может использоваться в качестве поля имя

Моя задача потребовала двух логических частей:

часть # 1. перевести сообщение журнала, содержащее некоторую строку json fromat, в соответствующий класс дел с помощью net.liftweb.json

один из моих классов дел, как определено ниже:

case class Mobile(val title: Option[String],
                  val desc: Option[String],
                  val adtype: Option[Int],
                  val apkname: Option[String],
                  @transient val `package`: Option[String],
                  val appstoreid: Option[String]
                 ) extends java.io.Serializable

часть # 2. используйте структурированную потоковую передачу v2.1.0 + kafka v0.10 для обработки в реальном времени:

    val spark: SparkSession = SparkSession.
      builder().
      appName("structured streaming test").
      getOrCreate()

    val df = spark.
      readStream.
      format("kafka").
      option("kafka.bootstrap.servers", "localhost:9092").
      option("subscribe", "stream1").
      option("maxOffsetPerTrigger", "10000").
      load()

    import spark.implicits._

    val ds = df.

      //change the value's type from binary to STRING
      selectExpr("CAST(value AS STRING)").
      as[String].
      map(myLogicToProcessLogs)

    val query = ds.
      writeStream.
      outputMode("append").
//      format("console").
      trigger(ProcessingTime("10 seconds")).
      foreach(new HttpSink).
      start()

    query.awaitTermination()

Я получил причину ошибки в том, что в моих сообщениях журнала есть зарезервированное ключевое слово java, например «пакет», которое всегда не срабатывало из-за Spark SQL encoder

Примечание. Используя `package` для избежания проверки ключевых слов в scala и используя ключевое слово @transient для избежания сериализации java, я могу успешно преобразовать приведенный выше класс case в RDD и выполнить последующее преобразование и действие для пакетной обработки без каких-либо ошибок. незамедлительный. Но как я могу избежать проверки ключевых слов из кодировщика Spark SQL и потоковой передачи Structrued?

Есть один связанный вопрос: spark-submit завершается неудачно, когда поля класса case зарезервированы для ключевых слов java с обратными чертами но я могу сделать это, так как для анализа синтаксиса все еще нужен лифт-веб Json параметр construtor 'package'.

Я также обнаружил, что есть другой инструмент json, такой как Gson, который обеспечивает поддержку именования полей JSON для преобразования стандартных имен полей Java в имя поля Json. Существует ли аналогичный способ на json от liftweb? https://sites.google.com/site/gson/gson-user-guide#TOC-JSON-Field-Naming-Support

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...