Python 2.7
Pyspark 2.2.1
JDBC format for MySQL->Spark DF
For writing Spark DF-> AWS Redshift i am using the `Spark-Redshift` driver from Databricks.
Я читаю данные в Spark из таблиц MySQL для моего приложения, из-за контекста и в зависимости от входных аргументов, мне нужно получить либо все записи, которые были обновлены до сегодняшнего дня, либо только записи, которые были обновлены до запрошенной даты ( входит в комплект).
spark.read.format("jdbc")
.option("url", "url")
.option("driver", driver)
.option("dbtable", query)
.load()
и запрос
if days > 0:
get_date = date.today() - timedelta(days)
query = "(SELECT * FROM {} WHERE CAST({}.updatedAt AS date) >= DATE('{}') " \
"AND CAST({}.updatedAt AS date) < CURDATE()) AS t".format(table, table, get_date, table)
elif days == 0:
query = "(SELECT * FROM {} WHERE CAST({}.updatedAt AS date) < CURDATE() " \
"OR updatedAt IS NULL) AS t".format(table, table)
Этот столбец отметок времени отбрасывается, как только данные считываются в кадры данных Spark, а ETL не содержит других манипуляций, связанных с отметкой времени. Последний шаг - запись обработанных записей в таблицы AWS Redshift.
Моя проблема в том, что ИНОГДА происходит сбой приложения с
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp
при записи в Redshift, но я полагаю, что проблема возникает из-за преобразования при чтении, и это просто ленивое выполнение Spark, которое выдает исключение при записи в Redshift (в целевых таблицах Redshift нет никаких временных отметок или столбцов даты)
В прошлом месяце и для 4 различных ежедневных заданий я получал это исключение в журналах примерно в 15% случаев, а затем задания не выполнялись, но в большинстве случаев он работает нормально, что делает невозможным воспроизведение выдавать или отлаживать дальше.
Я подозреваю, что приведение String-> Timestamp в запросе SQL создает проблему, но я не уверен, как я могу добиться того же другим способом, который не вызовет это исключение. Любая помощь с благодарностью!
Дополнительная информация о трассировке стека:
py4j.protocol.Py4JJavaError: An error occurred while calling o827.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
и
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 601 in stage 93.0 failed 4 times, most recent failure: Lost task 601.3 in stage 93.0 (TID 5282, url, executor 5): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$7$$anonfun$apply$3.apply(RedshiftWriter.scala:234)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$7$$anonfun$apply$3.apply(RedshiftWriter.scala:233)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$8$$anonfun$apply$5.apply(RedshiftWriter.scala:252)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$8$$anonfun$apply$5.apply(RedshiftWriter.scala:248)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:324)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)