memsqlConnector создает исключение с потоковой структурой spark 2.4 - PullRequest
0 голосов
/ 24 февраля 2019

Я использую memsql-spark-connector 2.0.6 в приложении Spark Structured Streaming.

val map: Map[String, String] = Map(
("spark.memsql.host" -> "XXX",
"spark.memsql.port" -> "3306",
"spark.memsql.user" -> "ABC",
"spark.memsql.password" -> "YYY",
"spark.memsql.defaultDatabase" -> "ZZZ")
var sbuilder = SparkSession
  .builder()
  .config(sConf)
  .config("spark.sql.autoBroadcastJoinThreshold", "-1")
  .config("spark.sql.crossJoin.enabled", "true")

map.foreach(f => {
  sbuilder = sbuilder.config(f._1, f._2)
}
)
val eventsToMemSql = //readStream

eventsToMemSql.writeStream.option("checkpointLocation", "/checkpoint/events").foreachBatch{(df, batchId) => {
  df.persist()
  df.write.format("com.memsql.spark.connector").mode("append").save("tablename")
  df.unpersist()
}}.start

Я установил параметры memsql в объекте sparkConf.

Это дает следующее исключение:

Диагностика: Исключение класса пользователя: org.apache.spark.sql.streaming.StreamingQueryException: Невозможно создать PoolableConnectionFactory (допустимое свойство соединения zeroDateTimeBehavior)значения: «CONVERT_TO_NULL», «EXCEPTION» или «ROUND». Значение «convertToNull» недопустимо.)

Запись успешна в другом пакетном приложении spark2.2.

Это свойство установлено в исходном коде MemSqlConnectionPool.scala :

newPool.addConnectionProperty("zeroDateTimeBehavior", "convertToNull")

Исходный код в com.mysql.cj.conf.PropertyDefinitions проверяет CONVERT_TO_NULL

Пожалуйста, предложите, как это исключение можно преодолеть.

...