Я использую memsql-spark-connector 2.0.6 в приложении Spark Structured Streaming.
val map: Map[String, String] = Map(
("spark.memsql.host" -> "XXX",
"spark.memsql.port" -> "3306",
"spark.memsql.user" -> "ABC",
"spark.memsql.password" -> "YYY",
"spark.memsql.defaultDatabase" -> "ZZZ")
var sbuilder = SparkSession
.builder()
.config(sConf)
.config("spark.sql.autoBroadcastJoinThreshold", "-1")
.config("spark.sql.crossJoin.enabled", "true")
map.foreach(f => {
sbuilder = sbuilder.config(f._1, f._2)
}
)
val eventsToMemSql = //readStream
eventsToMemSql.writeStream.option("checkpointLocation", "/checkpoint/events").foreachBatch{(df, batchId) => {
df.persist()
df.write.format("com.memsql.spark.connector").mode("append").save("tablename")
df.unpersist()
}}.start
Я установил параметры memsql в объекте sparkConf
.
Это дает следующее исключение:
Диагностика: Исключение класса пользователя: org.apache.spark.sql.streaming.StreamingQueryException: Невозможно создать PoolableConnectionFactory (допустимое свойство соединения zeroDateTimeBehavior)значения: «CONVERT_TO_NULL», «EXCEPTION» или «ROUND». Значение «convertToNull» недопустимо.)
Запись успешна в другом пакетном приложении spark2.2.
Это свойство установлено в исходном коде MemSqlConnectionPool.scala :
newPool.addConnectionProperty("zeroDateTimeBehavior", "convertToNull")
Исходный код в com.mysql.cj.conf.PropertyDefinitions проверяет CONVERT_TO_NULL
Пожалуйста, предложите, как это исключение можно преодолеть.