Ценю вашу помощь заранее.Уже 2-й день я продолжал пробовать все перестановки и комбинации, не мог найти никакого решения.
- У меня есть настройка hadoop / spark с 4 узлами на azurehdinsight
- У меня есть простой код scala, который читаетиз HDFS и выполняет простые операции, такие как создание столбца в нижний регистр и т. д. Небольшой файл 434 КБ с 2300 записями с 14 столбцами.
- Код работает на моем локальном ноутбуке (win 10), а также на автономном кластере виртуальной машины Azure Ubuntu.
- Проблема - при записи в CSV выдается java.util.concurrent.TimeoutException: время ожидания фьючерса истекло [100000 миллисекунд]. Заметил, что у меня есть несколько частичных файлов в месте вывода, то есть
/data/8/1/data.csv
с /data/8/1/data.csv/_temporary
- Важные сегменты кода:
def get_spark(): SparkSession = {
val spark = org.apache.spark.sql.SparkSession.builder
.appName("My App").enableHiveSupport().getOrCreate;
return spark
}
val spark = get_spark()
val o_file = "/data/8/1/data.csv"
val loader = new Loader(spark)
var df = loader.load(i_file)
logger.info("Writing output to: {}", o_file)
df.write.mode("overwrite").option("header", "true").csv(o_file)<--throwing error here
И исключение составляет:
18/11/29 19:06:05 INFO LoaderProcessor: Writing output to: /data/8/1/data.csv 18/11/29 19:07:37 ERROR ApplicationMaster: Uncaught exception: java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:498) at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$runImpl(ApplicationMaster.scala:345) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anon$5.run(ApplicationMaster.scala:815) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869) at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:814) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:839) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) 18/11/29 19:07:37 ERROR FileFormatWriter: Aborting job null. java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:633) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225) at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:642) at zettasense.LoaderProcessor.execute(LoaderProcessor.scala:81) at zettasense.Processor$.main(Processor.scala:47) at zettasense.Processor.main(Processor.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$anon$4.run(ApplicationMaster.scala:721
ДОПОЛНИТЕЛЬНАЯ ИНФОРМАЦИЯ
Также заметил, что операция «запись» продолжает запись в выходную папку следующим образом:
part-00000-3e9566ae-c13c-468a-8732-e7b8a8df5335-c000.csv
part-00000-3e9566ae-c13c-468a-8732-e7b8a8df5335-c001.csv
---
, а затем в течение нескольких секунд:
part-00000-4f4979a0-d9f9-481b-aac4-115e63b9f59c-c000.csv
part-00000-4f4979a0-d9f9-481b-aac4-115e63b9f59c-c001.csv
---
Чтобы подтвердить это, яd reartioning, как это:
df = df.repartition(1)
Заметил то же самое.он продолжает записывать файл снова и снова.
Я также получаю это исключение сейчас:
java.lang.IllegalStateException: User did not initialize spark context! at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:510) at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$runImpl(ApplicationMaster.scala:345) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply$mcV$sp(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anonfun$run$2.apply(ApplicationMaster.scala:260) at org.apache.spark.deploy.yarn.ApplicationMaster$anon$5.run(ApplicationMaster.scala:815) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869) at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:814) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:259) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:839) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)