В настоящее время, когда я использовал Structured Streaming v2.1.0 + Kafka v0.10 для обработки журналов в реальном времени, я получил исключение в потоке "main" java.lang.UnsupportedOperationException: `package` является зарезервированным ключевым словом и не может использоваться в качестве поля имя
Моя задача потребовала двух логических частей:
часть # 1. перевести сообщение журнала, содержащее некоторую строку json fromat, в соответствующий класс дел с помощью net.liftweb.json
один из моих классов дел, как определено ниже:
case class Mobile(val title: Option[String],
val desc: Option[String],
val adtype: Option[Int],
val apkname: Option[String],
@transient val `package`: Option[String],
val appstoreid: Option[String]
) extends java.io.Serializable
часть # 2. используйте структурированную потоковую передачу v2.1.0 + kafka v0.10 для обработки в реальном времени:
val spark: SparkSession = SparkSession.
builder().
appName("structured streaming test").
getOrCreate()
val df = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("subscribe", "stream1").
option("maxOffsetPerTrigger", "10000").
load()
import spark.implicits._
val ds = df.
//change the value's type from binary to STRING
selectExpr("CAST(value AS STRING)").
as[String].
map(myLogicToProcessLogs)
val query = ds.
writeStream.
outputMode("append").
// format("console").
trigger(ProcessingTime("10 seconds")).
foreach(new HttpSink).
start()
query.awaitTermination()
Я получил причину ошибки в том, что в моих сообщениях журнала есть зарезервированное ключевое слово java, например «пакет», которое всегда не срабатывало из-за Spark SQL encoder
Примечание. Используя `package` для избежания проверки ключевых слов в scala и используя ключевое слово @transient для избежания сериализации java, я могу успешно преобразовать приведенный выше класс case в RDD и выполнить последующее преобразование и действие для пакетной обработки без каких-либо ошибок. незамедлительный.
Но как я могу избежать проверки ключевых слов из кодировщика Spark SQL и потоковой передачи Structrued?
Есть один связанный вопрос:
spark-submit завершается неудачно, когда поля класса case зарезервированы для ключевых слов java с обратными чертами
но я могу сделать это, так как для анализа синтаксиса все еще нужен лифт-веб Json параметр construtor 'package'.
Я также обнаружил, что есть другой инструмент json, такой как Gson, который обеспечивает поддержку именования полей JSON для преобразования стандартных имен полей Java в имя поля Json. Существует ли аналогичный способ на json от liftweb?
https://sites.google.com/site/gson/gson-user-guide#TOC-JSON-Field-Naming-Support