как избежать состояния гонки при использовании Scala Actor - PullRequest
4 голосов
/ 23 апреля 2011

Я пишу кусок кода, который будет заполнять коллекцию mongoDB, когда буфер (список) увеличится до определенного размера.

import scala.actors.Actor
import com.mongodb.casbah.Imports._
import scala.collection.mutable.ListBuffer

class PopulateDB extends Actor {
  val buffer = new ListBuffer[DBObject]
  val mongoConn = MongoConnection()
  val mongoCol = mongoConn("casbah_test")("logs")

  def add(info: DBObject = null) {
    if (info != null) buffer += info

    if (buffer.size > 0 && (info == null || buffer.length >= 1000)) {
      mongoCol.insert(buffer.toList)
      buffer.clear
      println("adding a batch")
    }
  }

  def act() {
    loop {
      react {
        case info: DBObject => add(info)

        case msg if msg == "closeConnection" =>
          println("Close connection")
          add()
          mongoConn.close
      }
    }
  }
}

Однако, когда я запускаю следующий код, scala иногда выдает «ConcurrentModificationException» в строке «mongoCol.insert (buffer.toList)».Я уверен, что это как-то связано с "mongoCol.insert".Мне интересно, есть ли что-то принципиально не так с кодом.Или я должен использовать что-то вроде «atomic {...}» от Akka, чтобы избежать этой проблемы.

Вот полный след стека:

PopulateDB@7e859a68: caught java.util.ConcurrentModificationException
java.util.ConcurrentModificationException
    at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:373)
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:392)
    at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:391)
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:113)
    at org.bson.BSONEncoder.putObject(BSONEncoder.java:67)
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:215)
    at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:180)
    at com.mongodb.DBCollection.insert(DBCollection.java:85)
    at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:561)
    at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:864)
    at PopulateDB.add(PopulateDB.scala:14)
    at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:26)
    at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:25)
    at scala.actors.ReactorTask.run(ReactorTask.scala:34)
    at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129)
    at PopulateDB.scala$actors$ReplyReactor$$super$resumeReceiver(PopulateDB.scala:5)
    at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:69)
    at PopulateDB.resumeReceiver(PopulateDB.scala:5)
    at scala.actors.Actor$class.searchMailbox(Actor.scala:478)
    at PopulateDB.searchMailbox(PopulateDB.scala:5)
    at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
    at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
    at scala.actors.ReactorTask.run(ReactorTask.scala:36)
    at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611)
    at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJoinWorkerThread.java:340)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)

Спасибо, Дерек

Ответы [ 2 ]

4 голосов
/ 25 апреля 2011

DBObject является не потокобезопасным;вы отправляете объект DBObject с вашим сообщением актера.Вполне вероятно, что позже он снова будет изменен, что вызовет эту проблему одновременной модификации.

Я бы предложил начать с попытки использовать clone() для объекта DBObject, когда он входит в субъект, и поместить его вваш буфер.Это только поверхностная копия, но, по крайней мере, ее должно быть достаточно, чтобы вызвать проблемы с одновременной модификацией в LinkedHashMap, который поддерживает ключи в DBObject (который хранится в порядке, благодаря LHM).

Я бы попробовал:

  def act() {
    loop {
      react {
        case info: DBObject => add(info.clone())

        case msg if msg == "closeConnection" =>
          println("Close connection")
          add()
          mongoConn.close
      }
    }
  }

Если это не сработает, посмотрите где-либо еще, где вы модифицируете объект DBObject после его отправки в Actor.

1 голос
/ 24 апреля 2011

Почему class ниже?

class PopulateDB extends Actor

У вас есть несколько PupulateDB актеров? Я ожидаю object PopulateDB extends Actor, так что один актер сконцентрирует эту задачу.

Кроме того, проблема, похоже, заключается в самой casbah или mongodb.

...