ConcurrentModificationException при использовании Spark collectionAccumulator - PullRequest
0 голосов
/ 26 ноября 2018

Я пытаюсь запустить приложение на основе Spark в кластере Azure HDInsight по требованию, и в нем регистрируется большое количество исключений SparkExceptions (вызванных ConcurrentModificationExceptions).Приложение запускается без этих ошибок, когда я запускаю локальный экземпляр Spark.

Я видел сообщения о подобных ошибках при использовании аккумуляторов , и мой код действительно использует CollectionAccumulator, однако я везде размещаю синхронизированные блоки, и это не имеет значения.Код, связанный с аккумулятором, выглядит следующим образом:

class MySparkClass(sc : SparkContext) {
    val myAccumulator = sc.collectionAccumulator[MyRecord]

    override def add(record: MyRecord) = {
        synchronized {
            myAccumulator.add(record)
        }
    }

    override def endOfBatch() = {
        synchronized {
            myAccumulator.value.asScala.foreach((record: MyRecord) => {
                processIt(record)
            })
        }
    }
}

Исключения не приводят к сбою приложения, однако, когда вызывается endOfBatch и код пытается прочитать значения из аккумулятора, он пусти processIt никогда не вызывается.

Мы используем HDInsight версии 3.6 с Spark версии 2.3.0

18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList.writeObject(ArrayList.java:770)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
    at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:565)
    at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:231)
    at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
    ... 13 more

Следующий код является более автономнымпример, который воспроизводит проблему.MyRecord - это простой класс падежа, содержащий только числовые значения.Код выполняется без ошибок локально, но в кластере HDInsight выдает ошибку выше.

object MainDemo {
    def main(args: Array[String]) {
        val sparkContext = SparkSession.builder.master("local[4]").getOrCreate().sparkContext
        val myAccumulator = sparkContext.collectionAccumulator[MyRecord]

        sparkContext.binaryFiles("/my/files/here").foreach(_ => {
            for(i <- 1 to 100000) {
                val record = MyRecord(i, 0, 0)
                myAccumulator.add(record)
            }
        })

        myAccumulator.value.asScala.foreach((record: MyRecord) => {
            // we expect this to be called once for each record that we 'add' above,
            // but it is never called
            println(record)
        })
    }
}

Ответы [ 2 ]

0 голосов
/ 30 ноября 2018

Имеет смысл читать аккумулятор только после того, как было вызвано какое-то действие на СДР (collect или count).

Также вам не нужно синхронизироваться на аккумуляторе, так как независимыйего копия будет выделена на раздел.

0 голосов
/ 29 ноября 2018

Я сомневаюсь, действительно ли помогает синхронизированный блок.CustomeAccumulator или все другие аккумуляторы не являются потокобезопасными.На самом деле они не обязаны это делать, поскольку метод DAGScheduler.updateAccumulators, который используется драйвером свечи для обновления значений аккумуляторов после завершения задачи (успешно или с ошибкой), выполняется только в одном потоке, который выполняет цикл планирования.Кроме того, они являются структурами данных только для записи для рабочих, которые имеют собственную ссылку на локальный аккумулятор, тогда как доступ к значению аккумулятора разрешен только водителю.И когда вы говорите, что он работает в локальном режиме, потому что это одиночная JVM, но в кластерном режиме, это разные экземпляры JVM и java, инициируются вызовы PRC для обеспечения связи.

Как выглядит ваш объект MyRecord, и если вы просто заканчиваете свою строку с помощью .value, скорее всего, наличие итератора над ним поможет.Просто попробуйте.

myAccumulator.value
...