Проблема Pyspark с метками времени при чтении БД MySQL - PullRequest
0 голосов
/ 28 марта 2019
Python 2.7
Pyspark 2.2.1
JDBC format for MySQL->Spark DF
For writing Spark DF-> AWS Redshift i am using the `Spark-Redshift` driver from Databricks.

Я читаю данные в Spark из таблиц MySQL для моего приложения, из-за контекста и в зависимости от входных аргументов, мне нужно получить либо все записи, которые были обновлены до сегодняшнего дня, либо только записи, которые были обновлены до запрошенной даты ( входит в комплект).

spark.read.format("jdbc")
            .option("url", "url")
            .option("driver", driver)
            .option("dbtable", query)
            .load()

и запрос

if days > 0:
    get_date = date.today() - timedelta(days)
    query = "(SELECT * FROM {} WHERE CAST({}.updatedAt AS date) >= DATE('{}') " \
            "AND CAST({}.updatedAt AS date) < CURDATE()) AS t".format(table, table, get_date, table)
elif days == 0:
    query = "(SELECT * FROM {} WHERE CAST({}.updatedAt AS date) < CURDATE() " \
            "OR updatedAt IS NULL) AS t".format(table, table)

Этот столбец отметок времени отбрасывается, как только данные считываются в кадры данных Spark, а ETL не содержит других манипуляций, связанных с отметкой времени. Последний шаг - запись обработанных записей в таблицы AWS Redshift.

Моя проблема в том, что ИНОГДА происходит сбой приложения с Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp при записи в Redshift, но я полагаю, что проблема возникает из-за преобразования при чтении, и это просто ленивое выполнение Spark, которое выдает исключение при записи в Redshift (в целевых таблицах Redshift нет никаких временных отметок или столбцов даты)

В прошлом месяце и для 4 различных ежедневных заданий я получал это исключение в журналах примерно в 15% случаев, а затем задания не выполнялись, но в большинстве случаев он работает нормально, что делает невозможным воспроизведение выдавать или отлаживать дальше.

Я подозреваю, что приведение String-> Timestamp в запросе SQL создает проблему, но я не уверен, как я могу добиться того же другим способом, который не вызовет это исключение. Любая помощь с благодарностью!

Дополнительная информация о трассировке стека:

py4j.protocol.Py4JJavaError: An error occurred while calling o827.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)

и

at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:609)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 601 in stage 93.0 failed 4 times, most recent failure: Lost task 601.3 in stage 93.0 (TID 5282, url, executor 5): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Timestamp
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$7$$anonfun$apply$3.apply(RedshiftWriter.scala:234)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$7$$anonfun$apply$3.apply(RedshiftWriter.scala:233)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$8$$anonfun$apply$5.apply(RedshiftWriter.scala:252)
at com.databricks.spark.redshift.RedshiftWriter$$anonfun$8$$anonfun$apply$5.apply(RedshiftWriter.scala:248)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:324)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...