Как заставить параллелизм работать с записью кадров данных в таблицы кустов? - PullRequest
0 голосов
/ 28 декабря 2018

У меня есть несколько потоков в Spark 1.6, записывающих в одну и ту же таблицу кустов (с использованием файлов паркетных файлов), когда они пытаются записать в одно и то же время, возникает ошибка при переименовании части файлов записи в HDFS.Я ищу решение, чтобы обойти эту известную проблему Spark.

class MyThread extends Runnable {
          def run {
          //some code
          myTable.write.format("parquet").mode("append")
                 .saveAsTable("hdfstable")
          //some code
          }
}
Executors.defaultThreadFactory().newThread(new MyThread).start()

Я получаю эту ошибку:

org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:156)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:189)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:239)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
    at fr.neolink.spark.streaming.StreamingNeo$.algo(StreamingNeo.scala:837)
    at fr.neolink.spark.streaming.StreamingNeo$$anonfun$main$3$$anonfun$apply$18$MyThread$1.run(StreamingNeo.scala:374)
    at java.lang.Thread.run(Thread.java:748)

, вызванную:

java.io.IOException: Failed to rename 
FileStatus{path=hdfs://my_hdfs_master/user/hive/warehouse/MYDB.db/hdfstable/_temporary/0/task_201812281010_1770_m_000000/part-r-00000-9a70cbea-d105-4f50-ba1b-372f555906ce.gz.parquet; 
isDirectory=false; length=4608; replication=3; blocksize=134217728; modification_time=1545988247575; 
access_time=1545988247494; owner=owner; group=hive; permission=rw-r--r--; isSymlink=false} 
to hdfs://my_hdfs_master/user/hive/warehouse/MYDB.db/hdfstable/part-r-00000-9a70cbea-d105-4f50-ba1b-372f555906ce.gz.parquet

Я нашелэта проблема на jira: https://issues.apache.org/jira/browse/SPARK-18626

Есть ли способ сделать поток пишущей детали безопасным?чтобы выполнить исполнение по одному, один за другим?

Спасибо.

1 Ответ

0 голосов
/ 28 декабря 2018

РЕШЕНИЕ

Использование this.synchronized{} как показано ниже

class MyThread extends Runnable{
      def run{
      //some code
         this.synchronized{
            myTable.write.format("parquet").mode("append")
                   .saveAsTable("hdfstable")
         }
      //some code
      }
}
Executors.defaultThreadFactory().newThread(new MyThread).start()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...